Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
Expand All @@ -6,15 +7,20 @@
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Persistence;
using Wolverine.Tracking;

namespace MartenTests.AggregateHandlerWorkflow;

// Phase 1 of the per-tenant-partitioned-events matrix (follow-up #3021): the single-store
// aggregate-handler scenarios beyond the foundational slice — [ReadAggregate], every append
// return shape, and [WriteAggregate] — each scoped by tenant against a Conjoined + Quick +
// aggregate-handler scenarios beyond the foundational slice — [ReadAggregate], every append return
// shape, [WriteAggregate] (optimistic + required-with-throw isolation), optimistic version checks,
// and cascading-message tenant inheritance — each scoped by tenant against a Conjoined + Quick +
// UseTenantPartitionedEvents store (string identity). Reuses TenantTally / PartitionedTenancyHost
// from tenant_partitioned_events_aggregate_workflow.cs.
//
// Streams are seeded via a direct tenant session because MartenOps.StartStream from a handler is
// silently dropped under UseTenantPartitionedEvents (GH-3025).
public class tenant_partitioned_aggregate_matrix : PostgresqlContext, IAsyncLifetime
{
private IHost theHost = null!;
Expand All @@ -29,7 +35,7 @@ public async Task InitializeAsync()
m.Schema.For<TenantTally>().MultiTenanted();
m.Projections.Snapshot<TenantTally>(SnapshotLifecycle.Inline);
},
typeof(TenantTallyHandler), typeof(PhaseOneMatrixHandlers));
typeof(TenantTallyHandler), typeof(PhaseOneMatrixHandlers), typeof(RequiredTallyHandler), typeof(VersionedTallyHandler), typeof(CascadingTallyHandler));

theStore = theHost.Services.GetRequiredService<IDocumentStore>();
}
Expand All @@ -40,8 +46,15 @@ public async Task DisposeAsync()
theHost.Dispose();
}

private Task SeedAsync(string tenant, string id)
=> theHost.InvokeMessageAndWaitAsync(new StartTally(id), tenant);
// Seed via a direct tenant session. NOT via the StartTally handler / MartenOps.StartStream — that
// side-effect is silently dropped under UseTenantPartitionedEvents (GH-3025), so it would leave no
// stream and the version/required scenarios below could not see a pre-existing aggregate.
private async Task SeedAsync(string tenant, string id)
{
await using var session = theStore.LightweightSession(tenant);
session.Events.StartStream<TenantTally>(id, new TallyIncremented(0));
await session.SaveChangesAsync();
}

private async Task<TenantTally?> LoadAsync(string tenant, string id)
{
Expand Down Expand Up @@ -123,6 +136,58 @@ public async Task write_aggregate_appends_to_the_routed_tenant_and_stays_isolate
// The same stream id, routed to tenant2, sees no aggregate (separate partition).
(await LoadAsync("tenant2", id)).ShouldBeNull();
}

[Fact]
public async Task required_write_aggregate_missing_in_the_routed_tenant_throws()
{
// Aggregate exists in tenant1; the same Required=true command routed to tenant2 must not find
// it there (partition isolation) and must raise RequiredDataMissingException rather than
// silently starting a tenant2 stream.
var id = UniqueId("req");
await SeedAsync("tenant1", id);

await theHost.InvokeMessageAndWaitAsync(new RequireAppendTally(id, 3), "tenant1");
(await LoadAsync("tenant1", id))!.Total.ShouldBe(3);

await Should.ThrowAsync<RequiredDataMissingException>(() =>
theHost.MessageBus().InvokeForTenantAsync("tenant2", new RequireAppendTally(id, 3)));
(await LoadAsync("tenant2", id)).ShouldBeNull();
}

[Fact]
public async Task optimistic_version_check_is_scoped_to_the_tenant_partition()
{
// StartTally appends one event, so each tenant's stream is at version 1.
var id = UniqueId("ver");
await SeedAsync("tenant1", id);
await SeedAsync("tenant2", id);

await theHost.InvokeMessageAndWaitAsync(new IncrementWithVersion(id, 1, 5), "tenant1");
(await LoadAsync("tenant1", id))!.Total.ShouldBe(5);
(await LoadAsync("tenant2", id))!.Total.ShouldBe(0);

await Should.ThrowAsync<ConcurrencyException>(() =>
theHost.MessageBus().InvokeForTenantAsync("tenant1", new IncrementWithVersion(id, 99, 1)));

await theHost.InvokeMessageAndWaitAsync(new IncrementWithVersion(id, 1, 7), "tenant2");
(await LoadAsync("tenant2", id))!.Total.ShouldBe(7);
}

[Fact]
public async Task cascaded_message_inherits_the_routed_tenant()
{
// A message cascaded from an aggregate handler must carry the handler's tenant (the outbox
// session is tenant-scoped) — otherwise per-tenant follow-on work silently misroutes.
var id = UniqueId("casc");
await SeedAsync("tenant1", id);

var marker = Guid.NewGuid();
await theHost.InvokeMessageAndWaitAsync(new AppendAndCascade(id, 4, marker), "tenant1");

(await LoadAsync("tenant1", id))!.Total.ShouldBe(4);
CascadingTallyHandler.CascadeTenants.TryGetValue(marker, out var tenant);
tenant.ShouldBe("tenant1");
}
}

public record ViewTally(string TenantTallyId);
Expand Down Expand Up @@ -166,3 +231,38 @@ public static async IAsyncEnumerable<object> Handle(IncrementViaAsync command, T
public static Events Handle(AppendToTally command, [WriteAggregate(Required = false)] TenantTally? tally)
=> new Events(new object[] { new TallyIncremented(command.Amount) });
}

public record RequireAppendTally(string TenantTallyId, int Amount);

public static class RequiredTallyHandler
{
public static Events Handle(RequireAppendTally command,
[WriteAggregate(Required = true, OnMissing = OnMissing.ThrowException)] TenantTally tally)
=> new Events(new object[] { new TallyIncremented(command.Amount) });
}

public record IncrementWithVersion(string TenantTallyId, long ExpectedVersion, int Amount);

[AggregateHandler(VersionSource = nameof(IncrementWithVersion.ExpectedVersion))]
public static class VersionedTallyHandler
{
public static TallyIncremented Handle(IncrementWithVersion command, TenantTally tally)
=> new(command.Amount);
}

public record AppendAndCascade(string TenantTallyId, int Amount, Guid Marker);

public record TallyCascaded(Guid Marker);

public static class CascadingTallyHandler
{
public static readonly System.Collections.Concurrent.ConcurrentDictionary<Guid, string?> CascadeTenants = new();

[AggregateHandler]
public static (Events, OutgoingMessages) Handle(AppendAndCascade command, TenantTally tally)
=> (new Events(new object[] { new TallyIncremented(command.Amount) }),
new OutgoingMessages { new TallyCascaded(command.Marker) });

public static void Handle(TallyCascaded message, Envelope envelope)
=> CascadeTenants[message.Marker] = envelope.TenantId;
}
Loading