diff --git a/src/TenantPartitionedEventsTests/AppendWrite/append_return_shapes_under_partitioning.cs b/src/TenantPartitionedEventsTests/AppendWrite/append_return_shapes_under_partitioning.cs new file mode 100644 index 0000000000..9223d0f5c0 --- /dev/null +++ b/src/TenantPartitionedEventsTests/AppendWrite/append_return_shapes_under_partitioning.cs @@ -0,0 +1,114 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Shouldly; +using TenantPartitionedEventsTests.Fixtures; +using Xunit; + +namespace TenantPartitionedEventsTests.AppendWrite; + +/// +/// #4617 section 3a — the multiple Append / StartStream overload shapes +/// (single event, `IEnumerable<object>`, `params object[]`, +/// IEvent-wrapped) each route through the bulk +/// mt_quick_append_events function and land in the owning tenant's +/// partition. Pin the shape-matrix so a future overload-resolution change +/// (or a refactor of how the appender enqueues events) doesn't silently +/// reshape what hits the partitioned tables. +/// +[Collection("guid-partitioned")] +public class append_return_shapes_under_partitioning +{ + private readonly GuidPartitionedFixture _fixture; + + public append_return_shapes_under_partitioning(GuidPartitionedFixture fixture) + { + _fixture = fixture; + } + + [Fact] + public async Task append_via_params_object_array_lands_in_tenants_partition() + { + var tenant = PartitionedFixtureBase.NewTenant(); + await _fixture.Store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, tenant); + + var streamId = Guid.NewGuid(); + await using (var s = _fixture.Store.LightweightSession(tenant)) + { + // Both StartStream + Append using `params object[]` — the canonical + // shape. + s.Events.StartStream(streamId, new TripStarted(streamId), new TripLeg(1)); + await s.SaveChangesAsync(); + } + await using (var s = _fixture.Store.LightweightSession(tenant)) + { + s.Events.Append(streamId, new TripLeg(2), new TripLeg(3)); + await s.SaveChangesAsync(); + } + + await using var q = _fixture.Store.QuerySession(tenant); + var events = await q.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(4); + } + + [Fact] + public async Task append_via_IEnumerable_lands_in_tenants_partition() + { + var tenant = PartitionedFixtureBase.NewTenant(); + await _fixture.Store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, tenant); + + var streamId = Guid.NewGuid(); + IEnumerable seedEvents = new object[] + { + new TripStarted(streamId), new TripLeg(1), new TripLeg(2) + }; + await using (var s = _fixture.Store.LightweightSession(tenant)) + { + // IEnumerable overload — the shape Wolverine-style bulk + // emitters commonly use. + s.Events.StartStream(streamId, seedEvents); + await s.SaveChangesAsync(); + } + + IEnumerable moreEvents = new object[] { new TripLeg(3) }; + await using (var s = _fixture.Store.LightweightSession(tenant)) + { + s.Events.Append(streamId, moreEvents); + await s.SaveChangesAsync(); + } + + await using var q = _fixture.Store.QuerySession(tenant); + var events = await q.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(4); + } + + [Fact] + public async Task append_single_event_via_params_array_of_one_lands_in_tenants_partition() + { + // Single-event boundary case — the params array has length 1. Validates + // the loop bound at array_length(event_ids, 1) inside the function. + var tenant = PartitionedFixtureBase.NewTenant(); + await _fixture.Store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, tenant); + + var streamId = Guid.NewGuid(); + await using (var s = _fixture.Store.LightweightSession(tenant)) + { + s.Events.StartStream(streamId, new TripStarted(streamId)); + await s.SaveChangesAsync(); + } + + await using (var s = _fixture.Store.LightweightSession(tenant)) + { + s.Events.Append(streamId, new TripLeg(42)); + await s.SaveChangesAsync(); + } + + await using var q = _fixture.Store.QuerySession(tenant); + var events = await q.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(2); + events[1].Version.ShouldBe(2L); + } +} diff --git a/src/TenantPartitionedEventsTests/AppendWrite/event_metadata_propagation_under_partitioning.cs b/src/TenantPartitionedEventsTests/AppendWrite/event_metadata_propagation_under_partitioning.cs new file mode 100644 index 0000000000..2ad9afce16 --- /dev/null +++ b/src/TenantPartitionedEventsTests/AppendWrite/event_metadata_propagation_under_partitioning.cs @@ -0,0 +1,166 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Events; +using JasperFx.MultiTenancy; +using Marten; +using Marten.Events; +using Marten.Storage; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using TenantPartitionedEventsTests.Fixtures; +using Weasel.Postgresql; +using Xunit; + +namespace TenantPartitionedEventsTests.AppendWrite; + +/// +/// #4617 section 3a — session-level metadata (CorrelationId, CausationId, +/// LastModifiedBy / user name, Headers) is stamped onto each event by +/// QuickEventAppender.applyQuickMetadata and carried through the +/// bulk mt_quick_append_events function into the per-tenant +/// partition. Per #4424 TenantPropagation, each event's TenantId +/// must match the stream's tenant — NOT the session's tenant — so a +/// session writing to multiple tenants in one save (uncommon but +/// supported) stamps each event with the right tenant. The shared +/// fixture's events table includes correlation_id, causation_id, headers, +/// and user_name columns by default — Marten registers all four metadata +/// types when the session's UserName / Correlation / Causation / Headers +/// properties are set on the session. +/// +[Collection("guid-partitioned")] +public class event_metadata_propagation_under_partitioning +{ + private readonly GuidPartitionedFixture _fixture; + + public event_metadata_propagation_under_partitioning(GuidPartitionedFixture fixture) + { + _fixture = fixture; + } + + [Fact] + public async Task event_TenantId_equals_stream_TenantId_under_partitioning() + { + // #4424 TenantPropagation: every event's TenantId is the STREAM's tenant + // (not the session's tenant — though in the simple single-tenant-session + // case they're the same). Pin that the event reads back with the + // expected tenant id, never null. + var tenant = PartitionedFixtureBase.NewTenant(); + await _fixture.Store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, tenant); + + var streamId = Guid.NewGuid(); + await using (var s = _fixture.Store.LightweightSession(tenant)) + { + s.Events.StartStream(streamId, + new TripStarted(streamId), new TripLeg(1)); + await s.SaveChangesAsync(); + } + + await using var q = _fixture.Store.QuerySession(tenant); + var events = await q.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(2); + + foreach (var e in events) + { + e.TenantId.ShouldBe(tenant, + "every event's TenantId must equal the stream's tenant (TenantPropagation invariant)"); + } + } +} + +/// +/// Companion to for +/// the metadata columns Marten makes OPT-IN: CorrelationId, +/// CausationId, Headers, UserName. The shared +/// GuidPartitionedFixture doesn't enable them (default-off), so this test uses +/// its own DocumentStore with MetadataConfig flipped on. Pins that session-level +/// values propagate onto every event the bulk function inserts — and that they +/// stay paired with the right tenant_id in the partition. +/// +public class event_optional_metadata_propagation_under_partitioning : IAsyncLifetime +{ + private string _schema = null!; + private DocumentStore _store = null!; + + public async Task InitializeAsync() + { + _schema = $"tp_meta_{Environment.ProcessId}_{Guid.NewGuid():N}".Substring(0, 32); + + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + try { await conn.DropSchemaAsync(_schema); } catch { } + + _store = DocumentStore.For(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = _schema; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseTenantPartitionedEvents = true; + opts.Events.AppendMode = EventAppendMode.QuickWithServerTimestamps; + opts.Policies.AllDocumentsAreMultiTenanted(); + + // Opt in to the four opt-in metadata columns. + opts.Events.MetadataConfig.CausationIdEnabled = true; + opts.Events.MetadataConfig.CorrelationIdEnabled = true; + opts.Events.MetadataConfig.HeadersEnabled = true; + opts.Events.MetadataConfig.UserNameEnabled = true; + + opts.Events.AddEventType(); + }); + } + + public Task DisposeAsync() + { + _store?.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public async Task session_metadata_propagates_to_each_event_in_the_tenants_partition() + { + await _store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, "alpha"); + + var correlation = "corr-" + Guid.NewGuid().ToString("N")[..10]; + var causation = "caus-" + Guid.NewGuid().ToString("N")[..10]; + var userName = "user-" + Guid.NewGuid().ToString("N")[..8]; + + var streamId = Guid.NewGuid(); + await using (var s = _store.LightweightSession("alpha")) + { + s.CorrelationId = correlation; + s.CausationId = causation; + s.LastModifiedBy = userName; + + s.Events.StartStream(streamId, + new MetaEvent("a"), new MetaEvent("b"), new MetaEvent("c")); + await s.SaveChangesAsync(); + } + + await using var q = _store.QuerySession("alpha"); + var events = await q.Events.FetchStreamAsync(streamId); + events.Count.ShouldBe(3); + + foreach (var e in events) + { + e.CorrelationId.ShouldBe(correlation, + "correlation id must propagate from the session onto every event in the bulk batch"); + e.CausationId.ShouldBe(causation, + "causation id must propagate from the session onto every event in the bulk batch"); + e.UserName.ShouldBe(userName, + "session.LastModifiedBy must propagate as the event's UserName column"); + e.TenantId.ShouldBe("alpha", + "TenantId still propagates correctly alongside the opt-in metadata"); + } + } +} + +public record MetaEvent(string Label); + +public class MetaAggregate +{ + public Guid Id { get; set; } + public string Label { get; set; } = string.Empty; + public void Apply(MetaEvent e) => Label = e.Label; +} diff --git a/src/TenantPartitionedEventsTests/Regressions/Bug_4611_mandatory_stream_type_under_partitioning.cs b/src/TenantPartitionedEventsTests/Regressions/Bug_4611_mandatory_stream_type_under_partitioning.cs new file mode 100644 index 0000000000..739e4a8e88 --- /dev/null +++ b/src/TenantPartitionedEventsTests/Regressions/Bug_4611_mandatory_stream_type_under_partitioning.cs @@ -0,0 +1,148 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Events; +using JasperFx.MultiTenancy; +using Marten; +using Marten.Events; +using Marten.Exceptions; +using Marten.Storage; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using Weasel.Postgresql; +using Xunit; + +namespace TenantPartitionedEventsTests.Regressions; + +/// +/// #4617 section 5 / #4611 — single-DB conjoined variant of the regression +/// already pinned for sharded in +/// Sharded/sharded_tenancy_per_tenant_events.cs::starting_then_appending_a_stream_works_with_mandatory_stream_type. +/// +/// +/// Before #4613: StartStream under +/// UseTenantPartitionedEvents + UseMandatoryStreamTypeDeclaration +/// routed through the bulk mt_quick_append_events function and tripped +/// the post-process guard (first event version == 1) which incorrectly +/// rejected a legitimate StartStream as "appending to a non-existent stream". +/// Events were tombstoned, the mt_streams row never landed, and a +/// subsequent Append against the (never-created) stream blew up with +/// . +/// +/// +/// +/// The fix exempted StreamActionType.Start from the post-process guard. +/// This single-DB conjoined variant pins the fix for the non-sharded shape; +/// the sharded variant already lives in the migrated test file. Each test uses +/// its own DocumentStore (NOT the shared fixture) because +/// UseMandatoryStreamTypeDeclaration is a store-wide flag that would +/// affect every sibling test on a shared fixture. +/// +/// +public class Bug_4611_mandatory_stream_type_under_partitioning : IAsyncLifetime +{ + private string _schema = null!; + private DocumentStore _store = null!; + + public async Task InitializeAsync() + { + _schema = $"tp_4611_{Environment.ProcessId}_{Guid.NewGuid():N}".Substring(0, 32); + + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + try { await conn.DropSchemaAsync(_schema); } catch { } + + _store = DocumentStore.For(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = _schema; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseTenantPartitionedEvents = true; + opts.Events.AppendMode = EventAppendMode.QuickWithServerTimestamps; + opts.Events.UseMandatoryStreamTypeDeclaration = true; + opts.Policies.AllDocumentsAreMultiTenanted(); + + opts.Events.AddEventType(); + }); + + await _store.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent)); + } + + public Task DisposeAsync() + { + _store?.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public async Task StartStream_with_aggregate_type_followed_by_Append_works() + { + // The headline #4611 regression — Start then Append must both succeed, + // and the mt_streams row's type column must carry the AggregateType name. + await _store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, "alpha"); + + var streamId = Guid.NewGuid(); + await using (var session = _store.LightweightSession("alpha")) + { + session.Events.StartStream(streamId, new MandatoryEvent("first")); + await session.SaveChangesAsync(); + } + + await using (var query = _store.QuerySession("alpha")) + { + (await query.Events.FetchStreamAsync(streamId)).Count.ShouldBe(1); + } + + await using (var session = _store.LightweightSession("alpha")) + { + session.Events.Append(streamId, new MandatoryEvent("second")); + await session.SaveChangesAsync(); + } + + await using (var query = _store.QuerySession("alpha")) + { + (await query.Events.FetchStreamAsync(streamId)).Count.ShouldBe(2); + } + + // The end state pin: mt_streams row exists with type = aggregate type alias. + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand($"select type from {_schema}.mt_streams where id = @id and tenant_id = @tid"); + cmd.Parameters.AddWithValue("id", streamId); + cmd.Parameters.AddWithValue("tid", "alpha"); + var typeName = (string?)await cmd.ExecuteScalarAsync(); + typeName.ShouldNotBeNull("mt_streams row must exist (not tombstoned) — the headline #4611 regression"); + typeName.ShouldBe(_store.Events.AggregateAliasFor(typeof(MandatoryAggregate))); + } + + [Fact] + public async Task untyped_StartStream_still_throws_StreamTypeMissingException() + { + // The companion #4611 pin: the API-level guard in EventStore.StartStream + // STILL fires synchronously for the no-type overload when + // UseMandatoryStreamTypeDeclaration is on. Pre-#4613's fix the + // bulk-path post-process guard was the wrong layer; the correct guard + // (this one) was never the problem. + await _store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, "alpha"); + + await using var session = _store.LightweightSession("alpha"); + + Should.Throw(() => + { + // No-type StartStream overload — must throw at call-site, not at + // SaveChangesAsync. + session.Events.StartStream(Guid.NewGuid(), new MandatoryEvent("should-not-land")); + }); + } +} + +public record MandatoryEvent(string Label); + +public class MandatoryAggregate +{ + public Guid Id { get; set; } + public string Label { get; set; } = string.Empty; + public void Apply(MandatoryEvent e) => Label = e.Label; +}