Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions src/Persistence/MartenTests/Dcb/dedup_load_boundary_frame_tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
using IntegrationTests;
using JasperFx.Events;
using JasperFx.Events.Tags;
using JasperFx.Resources;
using Marten;
using Marten.Events;
using MartenTests.AncillaryStores;
using MartenTests.Dcb.University;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Npgsql;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests.Dcb;

// Regression: two [BoundaryModel] parameters on the same chain (Validate +
// Handle) used to emit duplicate var declarations -> CS0128.
public record TwoBoundaryModelParamsCommand(StudentId StudentId, CourseId CourseId);

public static class TwoBoundaryModelParamsHandler
{
public static EventTagQuery Load(TwoBoundaryModelParamsCommand command)
=> EventTagQuery
.For(command.CourseId)
.AndEventsOfType<CourseCreated, CourseCapacityChanged, StudentSubscribedToCourse, StudentUnsubscribedFromCourse>()
.Or(command.StudentId)
.AndEventsOfType<StudentEnrolledInFaculty, StudentSubscribedToCourse, StudentUnsubscribedFromCourse>();

public static HandlerContinuation Validate(
TwoBoundaryModelParamsCommand command,
[BoundaryModel] SubscriptionState state,
ILogger logger)
{
if (state.StudentId == null)
{
logger.LogDebug("Student {StudentId} not enrolled", command.StudentId);
return HandlerContinuation.Stop;
}

return HandlerContinuation.Continue;
}

public static StudentSubscribedToCourse Handle(
TwoBoundaryModelParamsCommand command,
[BoundaryModel] SubscriptionState state)
{
return new StudentSubscribedToCourse(FacultyId.Default, command.StudentId, command.CourseId);
}
}

public class dedup_load_boundary_frame_tests : PostgresqlContext, IAsyncLifetime
{
private IHost theHost = null!;
private IDocumentStore theStore = null!;

public async Task InitializeAsync()
{
await using (var conn = new NpgsqlConnection(Servers.PostgresConnectionString))
{
await conn.OpenAsync();
await using var cmd = conn.CreateCommand();
cmd.CommandText = "DROP SCHEMA IF EXISTS dcb_dedup_tests CASCADE;";
await cmd.ExecuteNonQueryAsync();
}

theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "dcb_dedup_tests";

m.Events.RegisterTagType<StudentId>("student")
.ForAggregate<SubscriptionState>();
m.Events.RegisterTagType<CourseId>("course")
.ForAggregate<SubscriptionState>();
m.Events.RegisterTagType<FacultyId>("faculty");

m.Projections.LiveStreamAggregation<SubscriptionState>();

m.Events.AddEventType<CourseCreated>();
m.Events.AddEventType<CourseCapacityChanged>();
m.Events.AddEventType<StudentEnrolledInFaculty>();
m.Events.AddEventType<StudentSubscribedToCourse>();
m.Events.AddEventType<StudentUnsubscribedFromCourse>();

m.Events.StreamIdentity = StreamIdentity.AsString;

m.DisableNpgsqlLogging = true;
})
.UseLightweightSessions()
.IntegrateWithWolverine();

opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(TwoBoundaryModelParamsHandler));
opts.Durability.Mode = DurabilityMode.Solo;
opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

theStore = theHost.Services.GetRequiredService<IDocumentStore>();
}

public async Task DisposeAsync()
{
await theHost.StopAsync();
theHost.Dispose();
}

[Fact]
public async Task chain_with_two_boundary_model_parameters_compiles_and_runs()
{
var courseId = CourseId.Random();
var studentId = StudentId.Random();

await using (var session = theStore.LightweightSession())
{
var courseCreated = session.Events.BuildEvent(
new CourseCreated(FacultyId.Default, courseId, "Math 101", 10));
courseCreated.WithTag(courseId);
session.Events.Append(courseId.Value, courseCreated);

var enrolled = session.Events.BuildEvent(
new StudentEnrolledInFaculty(FacultyId.Default, studentId, "Alice", "Smith"));
enrolled.WithTag(studentId);
session.Events.Append(studentId.Value, enrolled);

await session.SaveChangesAsync();
}

// Pre-fix: this throws at handler-compilation with CS0128.
await theHost.InvokeMessageAndWaitAsync(
new TwoBoundaryModelParamsCommand(studentId, courseId));

await using var verifySession = theStore.LightweightSession();
var events = await verifySession.Events.QueryByTagsAsync(
new EventTagQuery().Or<StudentId>(studentId));

events.ShouldContain(e => e.Data is StudentSubscribedToCourse);
}
}
15 changes: 12 additions & 3 deletions src/Persistence/Wolverine.Marten/BoundaryModelAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public OnMissing OnMissing
set => _onMissing = value;
}

[UnconditionalSuppressMessage("Trimming", "IL2062",
Justification = "aggregateType originates from parameter.ParameterType; AOT consumers preserve it via DynamicDependency / source-generator registration.")]
[UnconditionalSuppressMessage("Trimming", "IL2065",
Justification = "MakeGenericType closes IEventBoundary<TAggregate>; GetProperty(nameof(IEventBoundary.Aggregate)) is statically referenced via nameof and the closed-generic IEventBoundary<TAggregate> preserves the Aggregate property by virtue of being instantiated by codegen. AOT consumers pre-generate via TypeLoadMode.Static.")]
[UnconditionalSuppressMessage("AOT", "IL3050",
Expand Down Expand Up @@ -85,9 +87,16 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC

new MartenPersistenceFrameProvider().ApplyTransactionSupport(chain, container);

// The EventTagQuery variable will be resolved lazily from the Load method's return value
var loader = new LoadBoundaryFrame(aggregateType);
chain.Middleware.Add(loader);
// One fetch per (chain, aggregate type). A second [BoundaryModel] of
// the same type (e.g. on Validate plus Handle) reuses the same frame,
// otherwise both emit identically-named "var" declarations -> CS0128.
var loader = chain.Middleware.OfType<LoadBoundaryFrame>()
.FirstOrDefault(f => f.AggregateType == aggregateType);
if (loader == null)
{
loader = new LoadBoundaryFrame(aggregateType);
chain.Middleware.Add(loader);
}

var boundary = loader.Boundary;

Expand Down
7 changes: 4 additions & 3 deletions src/Persistence/Wolverine.Marten/Codegen/LoadBoundaryFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ namespace Wolverine.Marten.Codegen;

internal class LoadBoundaryFrame : AsyncFrame
{
private readonly Type _aggregateType;
private Variable? _query;
private Variable? _session;
private Variable? _token;
private readonly Type _boundaryType;

public LoadBoundaryFrame(Type aggregateType, Variable? query = null)
{
_aggregateType = aggregateType;
AggregateType = aggregateType;
_query = query;
_boundaryType = typeof(IEventBoundary<>).MakeGenericType(aggregateType);
Boundary = new Variable(_boundaryType, this);
}

public Type AggregateType { get; }

public Variable Boundary { get; }

public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
Expand All @@ -42,7 +43,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteComment("Loading DCB boundary model via FetchForWritingByTags");
writer.WriteLine(
$"var {Boundary.Usage} = await {_session!.Usage}.Events.FetchForWritingByTags<{_aggregateType.FullNameInCode()}>({_query!.Usage}, {_token!.Usage});");
$"var {Boundary.Usage} = await {_session!.Usage}.Events.FetchForWritingByTags<{AggregateType.FullNameInCode()}>({_query!.Usage}, {_token!.Usage});");

Next?.GenerateCode(method, writer);
}
Expand Down
16 changes: 14 additions & 2 deletions src/Persistence/Wolverine.Polecat/BoundaryModelAttribute.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using JasperFx;
using JasperFx.CodeGeneration;
Expand Down Expand Up @@ -36,6 +37,12 @@ public OnMissing OnMissing
set => _onMissing = value;
}

[UnconditionalSuppressMessage("Trimming", "IL2062",
Justification = "aggregateType originates from parameter.ParameterType; AOT consumers preserve it via DynamicDependency / source-generator registration.")]
[UnconditionalSuppressMessage("Trimming", "IL2065",
Justification = "MakeGenericType closes IEventBoundary<TAggregate>; GetProperty(nameof(IEventBoundary.Aggregate)) is statically referenced via nameof and the closed-generic IEventBoundary<TAggregate> preserves the Aggregate property by virtue of being instantiated by codegen. AOT consumers pre-generate via TypeLoadMode.Static.")]
[UnconditionalSuppressMessage("AOT", "IL3050",
Justification = "MakeGenericType closes IEventBoundary<TAggregate> at codegen time; AOT consumers pre-generate via TypeLoadMode.Static.")]
public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceContainer container,
GenerationRules rules)
{
Expand Down Expand Up @@ -73,8 +80,13 @@ public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceC

new PolecatPersistenceFrameProvider().ApplyTransactionSupport(chain, container);

var loader = new LoadBoundaryFrame(aggregateType);
chain.Middleware.Add(loader);
var loader = chain.Middleware.OfType<LoadBoundaryFrame>()
.FirstOrDefault(f => f.AggregateType == aggregateType);
if (loader == null)
{
loader = new LoadBoundaryFrame(aggregateType);
chain.Middleware.Add(loader);
}

var boundary = loader.Boundary;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@ namespace Wolverine.Polecat.Codegen;

internal class LoadBoundaryFrame : AsyncFrame
{
private readonly Type _aggregateType;
private Variable? _query;
private Variable? _session;
private Variable? _token;
private readonly Type _boundaryType;

public LoadBoundaryFrame(Type aggregateType, Variable? query = null)
{
_aggregateType = aggregateType;
AggregateType = aggregateType;
_query = query;
_boundaryType = typeof(IEventBoundary<>).MakeGenericType(aggregateType);
Boundary = new Variable(_boundaryType, this);
}

public Type AggregateType { get; }

public Variable Boundary { get; }

public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
Expand All @@ -42,7 +43,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.WriteComment("Loading DCB boundary model via FetchForWritingByTags");
writer.WriteLine(
$"var {Boundary.Usage} = await {_session!.Usage}.Events.FetchForWritingByTags<{_aggregateType.FullNameInCode()}>({_query!.Usage}, {_token!.Usage});");
$"var {Boundary.Usage} = await {_session!.Usage}.Events.FetchForWritingByTags<{AggregateType.FullNameInCode()}>({_query!.Usage}, {_token!.Usage});");

Next?.GenerateCode(method, writer);
}
Expand Down
Loading