From dd1596a8ff1fc6808b0c5979b0487e5ed456f17d Mon Sep 17 00:00:00 2001 From: Anne Erdtsieck Date: Wed, 3 Jun 2026 11:07:16 +0200 Subject: [PATCH 1/2] Fix #4611: StartStream + mandatory stream type + per-tenant partitioned events When UseTenantPartitionedEvents is enabled, StartStream actions are routed through the bulk mt_quick_append_events operation (QuickEventAppender). The PostprocessAsync guard that rejects appending to a non-existent stream under UseMandatoryStreamTypeDeclaration (a new stream's first event comes back as version 1) also fired for a legitimate StartStream of a brand-new stream, throwing NonExistentStreamException. The events were then tombstoned, so a later append to the never-created stream failed as well. Only treat version 1 as an error for Append actions, not Start. --- .../QuickAppendEventsOperationBase.cs | 12 ++++- ...sharded_tenancy_per_tenant_events_tests.cs | 53 ++++++++++++++++++- 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs index 1bc796bf0a..616863e837 100644 --- a/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs +++ b/src/Marten/Events/Operations/QuickAppendEventsOperationBase.cs @@ -293,7 +293,17 @@ public async Task PostprocessAsync(DbDataReader reader, IList excepti events[i - 1].Sequence = values[i]; } - if (Events is { UseMandatoryStreamTypeDeclaration: true } && events[0].Version == 1) + // UseMandatoryStreamTypeDeclaration rejects appending to a stream that does not + // exist yet (its first event would come back as version 1). But when + // UseTenantPartitionedEvents is on, StartStream actions are also routed through this + // bulk-append operation (see QuickEventAppender.registerOperationsForStreams), and a + // legitimate StartStream of a brand-new stream likewise produces version 1. Only an + // Append — not a Start — to a non-existent stream is the case this guard is meant to + // catch, so exclude StartStream actions here. Without this, a partitioned StartStream + // throws NonExistentStreamException and the events get tombstoned (#4611). + if (Events is { UseMandatoryStreamTypeDeclaration: true } + && events[0].Version == 1 + && Stream.ActionType != StreamActionType.Start) { throw new NonExistentStreamException(Events.StreamIdentity == StreamIdentity.AsGuid ? Stream.Id diff --git a/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs b/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs index 466a73b639..90b7421118 100644 --- a/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs +++ b/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs @@ -63,7 +63,7 @@ public Task DisposeAsync() return Task.CompletedTask; } - private IDocumentStore CreateStore(Action? customConfig = null) + private IDocumentStore CreateStore(Action? customConfig = null, Action? storeConfig = null) { _store = DocumentStore.For(opts => { @@ -92,6 +92,8 @@ private IDocumentStore CreateStore(Action? customConfig = opts.Events.UseTenantPartitionedEvents = true; opts.Events.AddEventType(); + + storeConfig?.Invoke(opts); }); return _store; @@ -117,6 +119,47 @@ public async Task append_event_for_runtime_provisioned_tenant_does_not_throw_42P await session.SaveChangesAsync(); } + // #4611 — With UseTenantPartitionedEvents, StartStream actions are routed through the bulk + // mt_quick_append_events operation. Combined with UseMandatoryStreamTypeDeclaration, the + // post-process guard that rejects appends to a non-existent stream (first event => version 1) + // wrongly fired for a legitimate StartStream (also version 1), throwing NonExistentStreamException + // and tombstoning the events. A later append to the (never-created) stream then failed too. + [Fact] + public async Task starting_then_appending_a_stream_works_with_mandatory_stream_type() + { + CreateStore(x => x.UseSmallestDatabaseAssignment(), + opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.UseMandatoryStreamTypeDeclaration = true; + }); + await _store.Advanced.AddTenantToShardAsync("india", CancellationToken.None); + + var streamId = Guid.NewGuid().ToString(); + + await using (var session = _store.LightweightSession("india")) + { + session.Events.StartStream(streamId, new ShardedTestEvent { Value = "first" }); + await session.SaveChangesAsync(); + } + + await using (var query = _store.QuerySession("india")) + { + (await query.Events.FetchStreamAsync(streamId)).Count.ShouldBe(1); + } + + await using (var session = _store.LightweightSession("india")) + { + session.Events.Append(streamId, new ShardedTestEvent { Value = "second" }); + await session.SaveChangesAsync(); + } + + await using (var query = _store.QuerySession("india")) + { + (await query.Events.FetchStreamAsync(streamId)).Count.ShouldBe(2); + } + } + [Fact] public async Task per_tenant_event_sequence_lives_in_the_assigned_shard() { @@ -285,3 +328,11 @@ private static async Task assertSequenceDoesNotExist(string connectionString, st exists.ShouldBe(0L, because); } } + +public class ShardedAggregate +{ + public string Id { get; set; } = string.Empty; + public int Count { get; set; } + + public void Apply(ShardedTestEvent _) => Count++; +} From 827a25149de3cc96ea8221a7377828b29e30129a Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 3 Jun 2026 06:31:34 -0500 Subject: [PATCH 2/2] Strengthen #4611 test: assert mt_streams row + add untyped-StartStream guard test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two additions to the regression test for #4611: 1. The existing test now asserts the end state the fix is supposed to produce: the mt_streams row exists in the assigned shard for the started stream, and its `type` column is populated with the aggregate-type alias. This is the surface the original bug actually broke — events landed in mt_events but the StartStream got post-process-tombstoned, so the mt_streams row was never created (which then made every subsequent Append throw NonExistentStreamException). 2. New companion test under the same sharded + UseTenantPartitionedEvents + UseMandatoryStreamTypeDeclaration config that calls the no-type StartStream overloads and asserts they still throw StreamTypeMissingException synchronously. The fix in QuickAppendEventsOperationBase only relaxes the post-process "appended to a non-existent stream" guard for Start actions (Stream.ActionType != Append) — it does not, and must not, weaken the API-level guard in EventStore.StartStream that rejects untyped StartStream when UseMandatoryStreamTypeDeclaration is on. This test pins that guarantee so a future change to the bulk-path guard can't accidentally let untyped streams through. Pre-fix: the strengthened end-state test fails with the originally-reported NonExistentStreamException; the untyped-StartStream test passes (the API-level guard fires before the bulk path). Post-fix: both pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...sharded_tenancy_per_tenant_events_tests.cs | 68 ++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs b/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs index 90b7421118..86d08304f7 100644 --- a/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs +++ b/src/MultiTenancyTests/sharded_tenancy_per_tenant_events_tests.cs @@ -4,10 +4,12 @@ using System.Threading; using System.Threading.Tasks; using JasperFx; +using JasperFx.Core; using JasperFx.Events; using JasperFx.MultiTenancy; using Marten; using Marten.Events; +using Marten.Exceptions; using Marten.Storage; using Marten.Testing.Documents; using Marten.Testing.Harness; @@ -133,7 +135,7 @@ public async Task starting_then_appending_a_stream_works_with_mandatory_stream_t opts.Events.StreamIdentity = StreamIdentity.AsString; opts.Events.UseMandatoryStreamTypeDeclaration = true; }); - await _store.Advanced.AddTenantToShardAsync("india", CancellationToken.None); + var dbId = await _store.Advanced.AddTenantToShardAsync("india", CancellationToken.None); var streamId = Guid.NewGuid().ToString(); @@ -148,6 +150,18 @@ public async Task starting_then_appending_a_stream_works_with_mandatory_stream_t (await query.Events.FetchStreamAsync(streamId)).Count.ShouldBe(1); } + // End-state assertion that #4611 specifically broke: the mt_streams row must + // exist with the AggregateType name set. The original bug let the events land + // in mt_events but the post-process guard tombstoned the StartStream, so the + // mt_streams row never landed — that's why a later Append threw NonExistentStream. + await assertStreamRowExistsWithType( + _fixture.ConnectionStrings[dbId!], + streamId, + tenantId: "india", + // EventGraph.AggregateAliasFor for a non-generic type = type.Name.ToTableAlias() + // — i.e. "ShardedAggregate" → "sharded_aggregate". + expectedType: typeof(ShardedAggregate).Name.ToTableAlias()); + await using (var session = _store.LightweightSession("india")) { session.Events.Append(streamId, new ShardedTestEvent { Value = "second" }); @@ -160,6 +174,58 @@ public async Task starting_then_appending_a_stream_works_with_mandatory_stream_t } } + // #4611 follow-up — pin that the mandatory-stream-type check still fires for + // untyped StartStream under the same sharded + UseTenantPartitionedEvents config. + // The fix in QuickAppendEventsOperationBase only relaxes the post-process guard + // for Start actions; the API-level guard in EventStore.StartStream must still + // reject the no-type overload up front (synchronously, before SaveChangesAsync), + // so this combination doesn't quietly accept untyped streams. + [Fact] + public async Task untyped_StartStream_still_throws_when_mandatory_stream_type_is_on() + { + CreateStore(x => x.UseSmallestDatabaseAssignment(), + opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.UseMandatoryStreamTypeDeclaration = true; + }); + await _store.Advanced.AddTenantToShardAsync("juliet", CancellationToken.None); + + await using var session = _store.LightweightSession("juliet"); + + // The no-type StartStream overload — should be rejected immediately by the + // EventStore.StartStream guard, before any queuing or SaveChanges happens. + Should.Throw(() => + { + session.Events.StartStream( + Guid.NewGuid().ToString(), + new object[] { new ShardedTestEvent { Value = "should-not-land" } }); + }); + + // The params-array overload too — same enforcement path. + Should.Throw(() => + { + session.Events.StartStream( + Guid.NewGuid().ToString(), + new ShardedTestEvent { Value = "should-not-land" }); + }); + } + + private static async Task assertStreamRowExistsWithType( + string connectionString, string streamId, string tenantId, string expectedType) + { + await using var conn = new NpgsqlConnection(connectionString); + await conn.OpenAsync(); + var typeName = (string?)await conn.CreateCommand( + "select type from public.mt_streams where id = :id and tenant_id = :tid") + .With("id", streamId) + .With("tid", tenantId) + .ExecuteScalarAsync(); + typeName.ShouldNotBeNull( + $"mt_streams row for stream '{streamId}' (tenant '{tenantId}') must exist in {connectionString} — its absence is the #4611 regression surface"); + typeName.ShouldBe(expectedType); + } + [Fact] public async Task per_tenant_event_sequence_lives_in_the_assigned_shard() {