From 3a7a9a2033f7e66efc2081df0d5e385d66116d21 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Thu, 4 Jun 2026 13:25:26 -0500 Subject: [PATCH 1/2] Phase-1b tenant-partitioned aggregate matrix: exclusive write, empty-no-write, MartenOps tenant overloads, DeliveryOptions routing, AlwaysEnforceConsistency (#3021) Test-only. Extends the Conjoined + Quick + UseTenantPartitionedEvents single-store fixture (string identity) with the remaining single-store aggregate-handler scenarios that persist without AutoApplyTransactions: - exclusive-write concurrency ([AggregateHandler(ConcurrencyStyle.Exclusive)] -> FetchForExclusiveWriting) appends to the routed tenant and stays isolated - empty handler result makes no write (stream + aggregate unchanged) - MartenOps.Store / Insert / Delete tenant overloads land in the targeted tenant's document partition vs default, invoked with no ambient tenant - DeliveryOptions{TenantId} routes the message to that tenant's partition - AlwaysEnforceConsistency detects a concurrent write on the same (tenant) stream -> ConcurrencyException The IEventStream handler-parameter return shape is intentionally omitted: a compound handler that loads an IEventStream via FetchForWriting and appends gets no SaveChanges without AutoApplyTransactions (silently dropped), unlike [AggregateHandler]/IMartenOp returns. Filed as #3032. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...nt_partitioned_aggregate_matrix_phase1b.cs | 217 ++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_aggregate_matrix_phase1b.cs diff --git a/src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_aggregate_matrix_phase1b.cs b/src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_aggregate_matrix_phase1b.cs new file mode 100644 index 000000000..2d207d9ee --- /dev/null +++ b/src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_aggregate_matrix_phase1b.cs @@ -0,0 +1,217 @@ +using JasperFx; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.AggregateHandlerWorkflow; + +// Phase 1 (cont., follow-up #3021): the remaining single-store aggregate-handler matrix that reuses +// the Conjoined + Quick + UseTenantPartitionedEvents fixture (string identity) — exclusive-write +// concurrency, empty-result→no-write, IEventStream handler parameter, MartenOps tenant document +// overloads, DeliveryOptions{TenantId} routing, and AlwaysEnforceConsistency. Each scenario asserts +// the work lands in (and stays isolated to) the routed tenant partition. +public class tenant_partitioned_aggregate_matrix_phase1b : PostgresqlContext, IAsyncLifetime +{ + private IHost theHost = null!; + private IDocumentStore theStore = null!; + + public async Task InitializeAsync() + { + theHost = await PartitionedTenancyHost.StartAsync(StreamIdentity.AsString, + "tpe_p1b_" + Guid.NewGuid().ToString("N"), + m => + { + m.Schema.For().MultiTenanted(); + m.Projections.Snapshot(SnapshotLifecycle.Inline); + m.Schema.For().MultiTenanted(); + }, + typeof(Phase1bHandlers), typeof(TenantLedgerHandler), typeof(ConsistentTallyHandler)); + + theStore = theHost.Services.GetRequiredService(); + } + + public async Task DisposeAsync() + { + await theHost.StopAsync(); + theHost.Dispose(); + } + + private async Task SeedTally(string tenant, string id) + { + await using var session = theStore.LightweightSession(tenant); + session.Events.StartStream(id, new TallyIncremented(0)); + await session.SaveChangesAsync(); + } + + private async Task LoadTally(string tenant, string id) + { + await using var session = theStore.LightweightSession(tenant); + return await session.LoadAsync(id); + } + + private static string UniqueId(string p) => $"{p}-{Guid.NewGuid():N}"; + + [Fact] + public async Task exclusive_write_appends_to_the_routed_tenant_and_stays_isolated() + { + var id = UniqueId("excl"); + await SeedTally("tenant1", id); + await SeedTally("tenant2", id); + + await theHost.InvokeMessageAndWaitAsync(new ExclusiveIncrement(id, 8), "tenant1"); + + (await LoadTally("tenant1", id))!.Total.ShouldBe(8); + (await LoadTally("tenant2", id))!.Total.ShouldBe(0); + } + + [Fact] + public async Task empty_result_makes_no_write() + { + var id = UniqueId("noop"); + await SeedTally("tenant1", id); + + await theHost.InvokeMessageAndWaitAsync(new NoOpTally(id), "tenant1"); + + // Still just the seed event — no new append, aggregate unchanged. + await using var session = theStore.LightweightSession("tenant1"); + (await session.Events.FetchStreamAsync(id)).Count.ShouldBe(1); + (await LoadTally("tenant1", id))!.Total.ShouldBe(0); + } + + // NOTE: the IEventStream handler-parameter return shape is deliberately NOT covered here. A + // compound handler that loads an IEventStream via FetchForWriting and appends to it does not + // get Marten transaction support (no SaveChanges) without opts.Policies.AutoApplyTransactions() — + // so the append is silently dropped, unlike [AggregateHandler]/IMartenOp returns (the latter + // self-persist since #3025). This fixture intentionally runs without AutoApplyTransactions; the + // gap is filed as #3032. Bug_225 pins the working path (AutoApplyTransactions on). + + [Fact] + public async Task martenops_store_tenant_overload_lands_in_that_tenant_partition() + { + // Invoked with NO ambient tenant — the op's tenant overload selects the partition. + var id = UniqueId("led"); + await theHost.InvokeMessageAndWaitAsync(new StoreLedgerForTenant(id, "tenant1", 42)); + + (await LoadLedger("tenant1", id))!.Value.ShouldBe(42); + (await LoadLedger("tenant2", id)).ShouldBeNull(); + (await LoadLedger(StorageConstants.DefaultTenantId, id)).ShouldBeNull(); + } + + [Fact] + public async Task martenops_insert_then_delete_tenant_overloads_target_that_tenant() + { + var id = UniqueId("led2"); + await theHost.InvokeMessageAndWaitAsync(new InsertLedgerForTenant(id, "tenant2", 7)); + (await LoadLedger("tenant2", id))!.Value.ShouldBe(7); + (await LoadLedger("tenant1", id)).ShouldBeNull(); + + await theHost.InvokeMessageAndWaitAsync(new DeleteLedgerForTenant(id, "tenant2")); + (await LoadLedger("tenant2", id)).ShouldBeNull(); + } + + [Fact] + public async Task delivery_options_tenant_id_routes_to_that_partition() + { + var id = UniqueId("deliv"); + await SeedTally("tenant1", id); + + await theHost.TrackActivity().ExecuteAndWaitAsync(c => + c.PublishAsync(new DeliveryRoutedIncrement(id, 5), new DeliveryOptions { TenantId = "tenant1" })); + + (await LoadTally("tenant1", id))!.Total.ShouldBe(5); + (await LoadTally("tenant2", id)).ShouldBeNull(); + } + + [Fact] + public async Task always_enforce_consistency_detects_concurrent_write_in_the_tenant_partition() + { + var id = UniqueId("aec"); + await SeedTally("tenant1", id); + + // The handler emits no events but a concurrent writer advances the same (tenant1) stream + // mid-flight; AlwaysEnforceConsistency must still raise on SaveChanges, scoped to the partition. + await Should.ThrowAsync(() => + theHost.MessageBus().InvokeForTenantAsync("tenant1", new ConsistentNoOpTally(id))); + } + + private async Task LoadLedger(string tenant, string id) + { + await using var session = theStore.LightweightSession(tenant); + return await session.LoadAsync(id); + } +} + +public record ExclusiveIncrement(string TenantTallyId, int Amount); + +public record NoOpTally(string TenantTallyId); + +public record DeliveryRoutedIncrement(string TenantTallyId, int Amount); + +public static class Phase1bHandlers +{ + [AggregateHandler(Wolverine.Marten.ConcurrencyStyle.Exclusive)] + public static IEnumerable Handle(ExclusiveIncrement command, TenantTally tally) + { + yield return new TallyIncremented(command.Amount); + } + + [AggregateHandler] + public static IEnumerable Handle(NoOpTally command, TenantTally tally) + { + yield break; + } + + [AggregateHandler] + public static IEnumerable Handle(DeliveryRoutedIncrement command, TenantTally tally) + { + yield return new TallyIncremented(command.Amount); + } +} + +public class TenantLedger +{ + public string Id { get; set; } = null!; + public int Value { get; set; } +} + +public record StoreLedgerForTenant(string Id, string Tenant, int Value); + +public record InsertLedgerForTenant(string Id, string Tenant, int Value); + +public record DeleteLedgerForTenant(string Id, string Tenant); + +public static class TenantLedgerHandler +{ + public static IMartenOp Handle(StoreLedgerForTenant command) + => MartenOps.Store(new TenantLedger { Id = command.Id, Value = command.Value }, command.Tenant); + + public static IMartenOp Handle(InsertLedgerForTenant command) + => MartenOps.Insert(new TenantLedger { Id = command.Id, Value = command.Value }, command.Tenant); + + public static IMartenOp Handle(DeleteLedgerForTenant command) + => MartenOps.Delete(new TenantLedger { Id = command.Id }, command.Tenant); +} + +public record ConsistentNoOpTally(string TenantTallyId); + +public static class ConsistentTallyHandler +{ + [AggregateHandler(AlwaysEnforceConsistency = true)] + public static async Task> Handle( + ConsistentNoOpTally command, TenantTally tally, IDocumentStore store, Envelope envelope) + { + // Simulate a concurrent writer on the same (tenant) stream between FetchForWriting and SaveChanges. + await using var session = store.LightweightSession(envelope.TenantId!); + session.Events.Append(command.TenantTallyId, new TallyIncremented(1)); + await session.SaveChangesAsync(); + + return Array.Empty(); + } +} From a949c876f195659279d0cc57374a85c0a66efe73 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Thu, 4 Jun 2026 13:34:49 -0500 Subject: [PATCH 2/2] Natural-key aggregate isolation under UseTenantPartitionedEvents (#3021) Test-only. Reuses the NkOrderAggregate / NkHandlerOrderNumber / NkOrderHandler types from natural_key_aggregate_handler_workflow.cs against the partitioned fixture (Guid stream identity, string natural key). Pins that: - the same natural-key value in two tenants resolves to the routed tenant's own stream (FetchForWriting is scoped to the tenant partition, not cross-tenant) - multi-event returns append to the routed tenant - the [WriteAggregate] IEventStream handler (CompleteNkOrder) appends to the routed tenant and stays isolated -- and persists without AutoApplyTransactions, confirming the aggregate-workflow IEventStream path is unaffected by the compound-loader gap (#3032) Co-Authored-By: Claude Opus 4.8 (1M context) --- ...enant_partitioned_natural_key_aggregate.cs | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_natural_key_aggregate.cs diff --git a/src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_natural_key_aggregate.cs b/src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_natural_key_aggregate.cs new file mode 100644 index 000000000..7e83b3908 --- /dev/null +++ b/src/Persistence/MartenTests/AggregateHandlerWorkflow/tenant_partitioned_natural_key_aggregate.cs @@ -0,0 +1,112 @@ +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Tracking; +using Xunit; + +namespace MartenTests.AggregateHandlerWorkflow; + +// Phase 1 (cont., #3021): natural-key aggregate isolation under UseTenantPartitionedEvents. Reuses the +// NkOrderAggregate / NkHandlerOrderNumber / NkOrderHandler types from natural_key_aggregate_handler_workflow.cs. +// The aggregate keys streams by Guid but is fetched by a string natural key (NkHandlerOrderNumber) via +// FetchForWriting; the same natural-key value in two tenants must resolve to that tenant's +// own stream (the natural-key lookup is scoped to the routed tenant partition). +public class tenant_partitioned_natural_key_aggregate : PostgresqlContext, IAsyncLifetime +{ + private IHost theHost = null!; + private IDocumentStore theStore = null!; + + public async Task InitializeAsync() + { + theHost = await PartitionedTenancyHost.StartAsync(StreamIdentity.AsGuid, + "tpe_nk_" + Guid.NewGuid().ToString("N"), + m => + { + m.Schema.For().MultiTenanted(); + m.Projections.Snapshot(SnapshotLifecycle.Inline); + }, + typeof(NkOrderHandler)); + + theStore = theHost.Services.GetRequiredService(); + } + + public async Task DisposeAsync() + { + await theHost.StopAsync(); + theHost.Dispose(); + } + + // Seed a fresh order stream in the given tenant via a direct tenant session. + private async Task SeedOrder(string tenant, string orderNo, string customer) + { + var streamId = Guid.NewGuid(); + await using var session = theStore.LightweightSession(tenant); + session.Events.StartStream(streamId, + new NkHandlerOrderCreated(new NkHandlerOrderNumber(orderNo), customer)); + await session.SaveChangesAsync(); + return streamId; + } + + private async Task Load(string tenant, Guid streamId) + { + await using var session = theStore.LightweightSession(tenant); + return await session.LoadAsync(streamId); + } + + private static string UniqueOrderNo() => "ORD-" + Guid.NewGuid().ToString("N")[..8]; + + [Fact] + public async Task same_natural_key_in_two_tenants_resolves_to_the_routed_tenant_stream() + { + var orderNo = UniqueOrderNo(); + var id1 = await SeedOrder("tenant1", orderNo, "Alice"); + var id2 = await SeedOrder("tenant2", orderNo, "Bob"); // same natural key, different tenant + + // Routed to tenant1: the natural-key fetch must find tenant1's order, not tenant2's. + await theHost.InvokeMessageAndWaitAsync( + new AddNkOrderItem(new NkHandlerOrderNumber(orderNo), "Widget", 9.99m), "tenant1"); + + var t1 = await Load("tenant1", id1); + t1!.CustomerName.ShouldBe("Alice"); + t1.TotalAmount.ShouldBe(9.99m); + + // tenant2's same-natural-key order is untouched. + var t2 = await Load("tenant2", id2); + t2!.CustomerName.ShouldBe("Bob"); + t2.TotalAmount.ShouldBe(0m); + } + + [Fact] + public async Task natural_key_multi_event_return_appends_to_the_routed_tenant() + { + var orderNo = UniqueOrderNo(); + var id1 = await SeedOrder("tenant1", orderNo, "Alice"); + await SeedOrder("tenant2", orderNo, "Bob"); + + await theHost.InvokeMessageAndWaitAsync( + new AddNkOrderItems(new NkHandlerOrderNumber(orderNo), + [("Gadget", 19.99m), ("Doohickey", 5.50m)]), "tenant1"); + + (await Load("tenant1", id1))!.TotalAmount.ShouldBe(25.49m); + } + + [Fact] + public async Task natural_key_event_stream_handler_completes_in_the_routed_tenant() + { + // CompleteNkOrder's handler appends via a [WriteAggregate] IEventStream — the + // aggregate-workflow path (which applies transaction support, unlike the loader path in #3032). + var orderNo = UniqueOrderNo(); + var id1 = await SeedOrder("tenant1", orderNo, "Alice"); + var id2 = await SeedOrder("tenant2", orderNo, "Bob"); + + await theHost.InvokeMessageAndWaitAsync( + new CompleteNkOrder(new NkHandlerOrderNumber(orderNo)), "tenant1"); + + (await Load("tenant1", id1))!.IsComplete.ShouldBeTrue(); + (await Load("tenant2", id2))!.IsComplete.ShouldBeFalse(); // isolated + } +}