From 41fae05bf2f81a92936c41d2745db184ab8b5a63 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 9 Jun 2026 11:50:38 -0500 Subject: [PATCH] =?UTF-8?q?Fix=20#4705=20=E2=80=94=20composite=20projectio?= =?UTF-8?q?n=20shards=20stall=20at=20seq=201=20under=20per-tenant=20event?= =?UTF-8?q?=20partitioning?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: a composite projection runs an "optimized rebuild" via CompositeReplayExecutor on startup, whose ceiling comes from IEventDatabase.FetchHighestEventSequenceNumber(). Marten implemented that as `select last_value from mt_events_sequence` — the store-global sequence, which is NEVER advanced under UseTenantPartitionedEvents (each tenant draws seq_id from its own mt_events_sequence_{suffix}). So it read as 1 and the composite replayed only events 0..1, then parked at seq 1. Standalone projections were immune: their continuous agent is driven by the high-water detector (HighWaterMark, computed from max(seq_id)), not that method. Fix: under UseTenantPartitionedEvents, FetchHighestEventSequenceNumber reads `coalesce(max(seq_id), 0) from mt_events` — the real high-water — so the composite replay covers the whole stream. Also made FetchEventStoreStatistics.EventSequenceNumber partition-aware for consistency (the two "highest sequence" APIs now agree; previously both read the stale sequence). The non-partitioned path is unchanged (still mt_events_sequence.last_value). Note: the projection VERSION is irrelevant (the reporter's stalled shards merely happened to be versioned); the guard reproduces the stall at both v1 and v2. Tests: - Regressions/Bug_4705_versioned_composite_per_tenant: single-DB, single-tenant continuous-daemon guard at v1 AND v2 — composite (bundle + members) and a standalone control must both reach the high-water. - Admin/event_store_statistics_under_partitioning: the two pins that documented the stale-by-design behavior are updated to the corrected contract (FetchHighest / FetchMax / stats.EventSequenceNumber all agree on max(seq_id) under partitioning). Verified: TenantPartitionedEventsTests 183/183; non-partitioned DaemonTests.Composites 10/10. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../Storage/MartenDatabase.EventStorage.cs | 25 ++- ...ent_store_statistics_under_partitioning.cs | 54 +++-- ...Bug_4705_versioned_composite_per_tenant.cs | 191 ++++++++++++++++++ 3 files changed, 236 insertions(+), 34 deletions(-) create mode 100644 src/TenantPartitionedEventsTests/Regressions/Bug_4705_versioned_composite_per_tenant.cs diff --git a/src/Marten/Storage/MartenDatabase.EventStorage.cs b/src/Marten/Storage/MartenDatabase.EventStorage.cs index 513272c450..d7b4d45cac 100644 --- a/src/Marten/Storage/MartenDatabase.EventStorage.cs +++ b/src/Marten/Storage/MartenDatabase.EventStorage.cs @@ -117,9 +117,19 @@ public async Task FetchHighestEventSequenceNumber(CancellationToken token await conn.OpenAsync(token).ConfigureAwait(false); try { - var highest = (long)await conn - .CreateCommand($"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;") - .ExecuteScalarAsync(token).ConfigureAwait(false)!; + // #4705: under per-tenant event partitioning the store-global mt_events_sequence is + // never advanced -- every tenant's events draw seq_id from its own + // mt_events_sequence_{suffix}, so the global sequence's last_value is stale (reads as 1). + // Callers use this as a high-water ceiling (e.g. the composite single-pass replay + // executor); reading the stale 1 made composite shards replay only events 0..1 and + // stall. Read the real maximum from the events table instead in that mode. + var sql = Options.Events.UseTenantPartitionedEvents + ? $"select coalesce(max(seq_id), 0) from {Options.Events.DatabaseSchemaName}.mt_events;" + : $"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;"; + + var highest = (long)(await conn + .CreateCommand(sql) + .ExecuteScalarAsync(token).ConfigureAwait(false))!; return highest; } @@ -167,10 +177,17 @@ public async Task FetchHighestEventSequenceNumber(CancellationToken token public async Task FetchEventStoreStatistics( CancellationToken token = default) { + // #4705: under per-tenant partitioning the store-global mt_events_sequence is never advanced, + // so EventSequenceNumber would be stale (reads as 1). Read max(seq_id) in that mode so it stays + // consistent with FetchHighestEventSequenceNumber / FetchMaxEventSequenceAsync. + var highWaterSql = Options.Events.UseTenantPartitionedEvents + ? $"select coalesce(max(seq_id), 0) from {Options.Events.DatabaseSchemaName}.mt_events;" + : $"select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence;"; + var sql = $@" select count(*) from {Options.Events.DatabaseSchemaName}.mt_events; select count(*) from {Options.Events.DatabaseSchemaName}.mt_streams; -select last_value from {Options.Events.DatabaseSchemaName}.mt_events_sequence; +{highWaterSql} "; await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); diff --git a/src/TenantPartitionedEventsTests/Admin/event_store_statistics_under_partitioning.cs b/src/TenantPartitionedEventsTests/Admin/event_store_statistics_under_partitioning.cs index 4277408e21..3d6ba1090f 100644 --- a/src/TenantPartitionedEventsTests/Admin/event_store_statistics_under_partitioning.cs +++ b/src/TenantPartitionedEventsTests/Admin/event_store_statistics_under_partitioning.cs @@ -66,39 +66,32 @@ public async Task FetchEventStoreStatistics_EventCount_and_StreamCount_reflect_a } [Fact] - public async Task FetchEventStoreStatistics_EventSequenceNumber_is_STALE_under_partitioning() + public async Task FetchHighestEventSequenceNumber_tracks_high_water_under_partitioning() { - // The pin: the global mt_events_sequence is NEVER nextval'd under - // partitioning (per-tenant sequences are used instead). The - // EventSequenceNumber field of statistics reads `last_value` from that - // dead global sequence, so it stays at its starting value regardless - // of how many events have been appended store-wide. - // - // This is by-design — but monitoring tools that historically used - // EventSequenceNumber as the event-store high-water need to switch to - // FetchMaxEventSequenceAsync under partitioning. Pin the broken-by-design - // value so the divergence is part of the documented contract. + // #4705: the store-global mt_events_sequence is never nextval'd under partitioning (per-tenant + // sequences are used), so reading its last_value returned a stale 1. That stale value, used as a + // ceiling by the composite single-pass replay executor, made composite shards stall at seq 1. + // FetchHighestEventSequenceNumber (and FetchEventStoreStatistics.EventSequenceNumber) now read + // max(seq_id) under partitioning, so they report the real high-water and agree with + // FetchMaxEventSequenceAsync. var tenant = PartitionedFixtureBase.NewTenant(); await _fixture.Store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, tenant); - // Snapshot the global "high-water" BEFORE we append. var db = (MartenDatabase)_fixture.Store.Storage.Database; - var globalBefore = await db.FetchHighestEventSequenceNumber(CancellationToken.None); - var statsBefore = await db.FetchEventStoreStatistics(token: CancellationToken.None); - - // Now append 10 events to a fresh tenant. await _fixture.AppendNEventsAsync(tenant, 10); - var globalAfter = await db.FetchHighestEventSequenceNumber(CancellationToken.None); - var statsAfter = await db.FetchEventStoreStatistics(token: CancellationToken.None); + var high = await db.FetchHighestEventSequenceNumber(CancellationToken.None); + var max = (await db.FetchMaxEventSequenceAsync(CancellationToken.None)) ?? 0L; + var stats = await db.FetchEventStoreStatistics(token: CancellationToken.None); + + // No longer the stale 1 — at least this tenant's 10 events exist somewhere in the store. + high.ShouldBeGreaterThanOrEqualTo(10L, + "FetchHighestEventSequenceNumber must read the real max(seq_id) under partitioning, not the dead global sequence"); - // The global sequence is unchanged — pin the staleness. - globalAfter.ShouldBe(globalBefore, - "FetchHighestEventSequenceNumber reads the store-global sequence which is " + - "NEVER advanced under per-tenant partitioning — stale by design"); - statsAfter.EventSequenceNumber.ShouldBe(statsBefore.EventSequenceNumber, - "FetchEventStoreStatistics.EventSequenceNumber is the same dead value " + - "as FetchHighestEventSequenceNumber"); + // The two 'highest sequence' APIs and the statistics field are now consistent. + high.ShouldBe(max); + stats.EventSequenceNumber.ShouldBe(max, + "FetchEventStoreStatistics.EventSequenceNumber must match max(seq_id) under partitioning"); } [Fact] @@ -134,11 +127,12 @@ public async Task FetchMaxEventSequenceAsync_returns_the_correct_high_water_unde // a sibling tenant has already written more events. maxAfter.ShouldBeGreaterThanOrEqualTo(3L); - // And critically: this MUST diverge from FetchHighestEventSequenceNumber - // under partitioning — that's the whole point. + // #4705: FetchHighestEventSequenceNumber is now partition-aware too, so the two AGREE + // (both read max(seq_id) under partitioning). Before the fix this asserted divergence; the + // divergence WAS the bug. var globalSeq = await db.FetchHighestEventSequenceNumber(CancellationToken.None); - maxAfter.ShouldBeGreaterThan(globalSeq, - "FetchMaxEventSequenceAsync must diverge from the stale FetchHighestEventSequenceNumber " + - "under partitioning — this divergence IS the monitoring-grade pin"); + maxAfter.ShouldBe(globalSeq, + "FetchMaxEventSequenceAsync and FetchHighestEventSequenceNumber both read max(seq_id) " + + "under partitioning and must now agree"); } } diff --git a/src/TenantPartitionedEventsTests/Regressions/Bug_4705_versioned_composite_per_tenant.cs b/src/TenantPartitionedEventsTests/Regressions/Bug_4705_versioned_composite_per_tenant.cs new file mode 100644 index 0000000000..0139822329 --- /dev/null +++ b/src/TenantPartitionedEventsTests/Regressions/Bug_4705_versioned_composite_per_tenant.cs @@ -0,0 +1,191 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Storage; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using Weasel.Core; +using Weasel.Postgresql; +using Xunit; +using Xunit.Abstractions; + +namespace TenantPartitionedEventsTests.Regressions; + +/// +/// #4705 — composite projection shards stalled at last_seq_id=1 under per-tenant event partitioning +/// while a standalone async projection advanced to the high-water. +/// +/// +/// Root cause: a composite runs an "optimized rebuild" via CompositeReplayExecutor whose ceiling +/// comes from IEventDatabase.FetchHighestEventSequenceNumber(). Marten implemented that as +/// select last_value from mt_events_sequence — the store-global sequence, never advanced under +/// UseTenantPartitionedEvents (per-tenant mt_events_sequence_{suffix} carry the real +/// seq_ids), so it read as 1 and the composite replayed only events 0..1. Fixed by making +/// FetchHighestEventSequenceNumber read max(seq_id) from mt_events under per-tenant +/// partitioning. The standalone projection was always immune — its continuous agent is driven by the +/// high-water detector (HighWaterMark), not that method. +/// +/// +/// +/// This guard runs the scenario at version 1 AND version 2 to show the stall was never about the +/// projection version (the reporter's stalled shards merely happened to be versioned); both must reach +/// the high-water. Single-DB, single tenant — per-tenant partitioning is the only load-bearing factor. +/// +/// +public partial class Bug_4705_versioned_composite_per_tenant +{ + private readonly ITestOutputHelper _output; + + // Read by the projection constructors at registration time. The theory sets it before building + // the store; xUnit runs theory cases sequentially within a class, and each case uses its own + // schema, so there is no cross-case bleed. + private static uint _version = 1; + + public Bug_4705_versioned_composite_per_tenant(ITestOutputHelper output) => _output = output; + + public class Bug4705Trip { public Guid Id { get; set; } public double Distance { get; set; } public int Version { get; set; } } + public class Bug4705Count { public Guid Id { get; set; } public int Count { get; set; } public int Version { get; set; } } + public class Bug4705Standalone { public Guid Id { get; set; } public double Total { get; set; } public int Version { get; set; } } + + public record Bug4705Started(Guid Id); + public record Bug4705Leg(double Distance); + + public partial class Bug4705TripProjection: SingleStreamProjection + { + public Bug4705TripProjection() { Name = "Bug4705Trip"; Version = _version; } + public void Apply(Bug4705Trip a, Bug4705Leg e) => a.Distance += e.Distance; + } + + public partial class Bug4705CountProjection: SingleStreamProjection + { + public Bug4705CountProjection() { Name = "Bug4705Count"; Version = _version; } + public void Apply(Bug4705Count a, Bug4705Leg e) => a.Count++; + } + + // Control: a standalone async projection (matches the reporter's InvoiceJournalEntries that DOES advance). + public partial class Bug4705StandaloneProjection: SingleStreamProjection + { + public Bug4705StandaloneProjection() { Name = "Bug4705Standalone"; Version = _version; } + public void Apply(Bug4705Standalone a, Bug4705Leg e) => a.Total += e.Distance; + } + + private static string SchemaFor(uint version) => $"bug4705_v{version}_p{Environment.ProcessId}"; + + private static void configure(StoreOptions opts, uint version) + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = SchemaFor(version); + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseTenantPartitionedEvents = true; + opts.Events.AppendMode = EventAppendMode.QuickWithServerTimestamps; + opts.Policies.AllDocumentsAreMultiTenanted(); + + // Short doc aliases — the nested test class names overflow PostgreSQL's 64-char identifier limit. + opts.Schema.For().DocumentAlias("b4705_trip"); + opts.Schema.For().DocumentAlias("b4705_cnt"); + opts.Schema.For().DocumentAlias("b4705_std"); + + // Standalone control. + opts.Projections.Add(ProjectionLifecycle.Async); + + // Versioned composite bundle with two SingleStream members. + opts.Projections.CompositeProjectionFor("bug4705-composite", c => + { + c.Version = version; + c.Add(stageNumber: 1); + c.Add(stageNumber: 2); + }); + } + + [Theory] + [InlineData(1u)] + [InlineData(2u)] + public async Task continuous_composite_under_per_tenant_partitioning(uint version) + { + _version = version; + var schema = SchemaFor(version); + + using var store = (DocumentStore)DocumentStore.For(o => configure(o, version)); + await store.Advanced.Clean.CompletelyRemoveAllAsync(); + await store.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent)); + + // #4705 reporter seeds a SINGLE tenant. That matters under per-tenant partitioning: each + // tenant's events use a per-tenant sequence starting at 1, so seq_id is only globally unique + // when there is one tenant. With one tenant a store-global :All shard can page the global + // seq cursor correctly (the standalone advances); the composite stall is then isolated. + var tenants = new[] { "t4705_solo" }; + await store.Advanced.AddMartenManagedTenantsAsync(CancellationToken.None, tenants); + + long appended = 0; + foreach (var tenant in tenants) + { + await using var session = store.LightweightSession(tenant); + for (var s = 0; s < 10; s++) + { + var streamId = Guid.NewGuid(); + session.Events.StartStream(streamId, + new Bug4705Started(streamId), new Bug4705Leg(1.0), new Bug4705Leg(2.0), new Bug4705Leg(3.0)); + appended += 4; + } + + await session.SaveChangesAsync(); + } + + using var daemon = await store.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + + // Poll the progression rows until the composite catches up to the high-water, or time out. + var sw = Stopwatch.StartNew(); + List<(string Name, long Seq)> rows = new(); + long highWater = 0, composite = 0, standalone = 0; + while (sw.Elapsed < 25.Seconds()) + { + rows = await progressionRowsAsync(schema); + highWater = rows.FirstOrDefault(r => r.Name == "HighWaterMark").Seq; + composite = rows.Where(r => r.Name.StartsWith("bug4705-composite")).Select(r => r.Seq).DefaultIfEmpty(0).Min(); + standalone = rows.Where(r => r.Name.StartsWith("Bug4705Standalone")).Select(r => r.Seq).DefaultIfEmpty(0).Max(); + if (highWater > 0 && composite >= highWater && standalone >= highWater) break; + await Task.Delay(500); + } + + await daemon.StopAllAsync(); + + _output.WriteLine($"=== Version {version}: appended {appended} events, highWater={highWater}, " + + $"standalone={standalone}, composite(min)={composite} ==="); + foreach (var (name, seq) in rows) _output.WriteLine($"{seq,6} | {name}"); + + // Control sanity: the standalone projection should always reach the high-water. + standalone.ShouldBe(highWater, $"[v{version}] standalone projection should reach high-water {highWater}"); + + // The actual question: does the composite reach the high-water, or stall (e.g. at 1)? + composite.ShouldBe(highWater, + $"[v{version}] composite shards stalled at {composite} of high-water {highWater} " + + $"(rows: {string.Join("; ", rows.Select(r => $"{r.Name}={r.Seq}"))})"); + } + + private static async Task> progressionRowsAsync(string schema) + { + await using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand( + $"select name, coalesce(last_seq_id,0) from {schema}.mt_event_progression order by name"); + await using var reader = await cmd.ExecuteReaderAsync(); + var rows = new List<(string, long)>(); + while (await reader.ReadAsync()) rows.Add((reader.GetString(0), reader.GetInt64(1))); + return rows; + } +}