diff --git a/src/Marten/AdvancedOperations.cs b/src/Marten/AdvancedOperations.cs index 19916aec76..8a499ce63c 100644 --- a/src/Marten/AdvancedOperations.cs +++ b/src/Marten/AdvancedOperations.cs @@ -285,6 +285,58 @@ public async Task RebuildSingleStreamAsync(Guid id, CancellationToken token = await session.SaveChangesAsync(token).ConfigureAwait(false); } + /// + /// Tenant-scoped overload of . + /// Convenience method to rebuild the projected document of type T for a single stream + /// identified by under the supplied . + /// Required when is + /// or when running under sharded + /// multi-tenancy — the event load and the upsert both have to be scoped to the + /// tenant or you get a default-tenant lookup miss / write. + /// *You will still have to call SaveChangesAsync() to commit the changes though!* + /// #4668 — new overload, not a default parameter, so existing call sites bind unchanged. + /// + /// The string-keyed stream id. + /// Tenant id to scope the rebuild session and the projected-document upsert to. + /// + /// + public async Task RebuildSingleStreamAsync(string streamKey, string tenantId, CancellationToken token = default) where T : class + { + await using var session = _store.LightweightSession(tenantId); + var document = await session.Events.AggregateStreamAsync(streamKey, token: token).ConfigureAwait(false); + session.Store(document!); + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + + /// + /// Tenant-scoped overload of . + /// Convenience method to rebuild the projected document of type T for a single stream + /// identified by under the supplied . + /// Required when is + /// or when running under sharded + /// multi-tenancy — the event load and the upsert both have to be scoped to the + /// tenant or you get a default-tenant lookup miss / write. + /// *You will still have to call SaveChangesAsync() to commit the changes though!* + /// #4668 — new overload, not a default parameter, so existing call sites bind unchanged. + /// + /// The Guid-keyed stream id. + /// Tenant id to scope the rebuild session and the projected-document upsert to. + /// + /// + public async Task RebuildSingleStreamAsync(Guid streamId, string tenantId, CancellationToken token = default) where T : class + { + // Mirror the Guid-overload's ConcurrencyChecks.Disabled posture so behavior matches + // the existing non-tenanted Guid path beyond the tenant scope. + await using var session = _store.LightweightSession(new SessionOptions + { + ConcurrencyChecks = ConcurrencyChecks.Disabled, + TenantId = tenantId + }); + var document = await session.Events.AggregateStreamAsync(streamId, token: token).ConfigureAwait(false); + session.Store(document!); + await session.SaveChangesAsync(token).ConfigureAwait(false); + } + /// /// "Upsert" tenant ids and matching partition suffixes to all conjoined, multi-tenanted /// tables *if* Marten-managed partitioning is applied to this store. This assumes a 1-1 diff --git a/src/MultiTenancyTests/rebuild_single_stream_tenant_overloads.cs b/src/MultiTenancyTests/rebuild_single_stream_tenant_overloads.cs new file mode 100644 index 0000000000..f8a23b91ec --- /dev/null +++ b/src/MultiTenancyTests/rebuild_single_stream_tenant_overloads.cs @@ -0,0 +1,319 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten; +using Marten.Events; +using Marten.Storage; +using Marten.Testing.Harness; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Weasel.Postgresql; +using Xunit; + +namespace MultiTenancyTests; + +/// +/// Coverage for the #4668 RebuildSingleStreamAsync tenant-aware +/// overloads on : +/// +/// +/// RebuildSingleStreamAsync<T>(Guid streamId, string tenantId, CancellationToken) +/// RebuildSingleStreamAsync<T>(string streamKey, string tenantId, CancellationToken) +/// +/// +/// +/// Both overloads must drive the underlying AggregateStreamAsync and +/// the projected-document upsert against the supplied tenant id. Two +/// tenancy shapes are exercised here: +/// +/// +/// +/// Conjoined tenancy — one database, multiple tenants +/// separated by tenant_id on mt_events and the projected +/// document tables. +/// Per-database multi-tenancy — one database per tenant via +/// . The rebuild +/// session must route to the right tenant database. +/// +/// +/// +/// Each test runs the rebuild for tenant A, then verifies (a) tenant A's +/// projection reflects A's events and (b) tenant B's projection is +/// unchanged from its own events — i.e. the rebuild stays scoped. +/// +/// +public class rebuild_single_stream_tenant_overloads +{ + // ---- Conjoined tenancy ------------------------------------------------ + + [Fact] + public async Task conjoined_rebuild_by_guid_with_tenant_id() + { + using var store = DocumentStore.For(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "rebuild_4668_conj_guid"; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + }); + await store.Advanced.Clean.CompletelyRemoveAllAsync(); + + // tenant A: 3 AEvents → ACount = 3 after rebuild + // tenant B: 2 BEvents → BCount = 2 after rebuild (must NOT pick up A's events) + var streamA = Guid.NewGuid(); + var streamB = Guid.NewGuid(); + + await using (var session = store.LightweightSession("tenantA")) + { + session.Events.StartStream(streamA, + new AEvent_4668(), new AEvent_4668(), new AEvent_4668()); + await session.SaveChangesAsync(); + } + + await using (var session = store.LightweightSession("tenantB")) + { + session.Events.StartStream(streamB, + new BEvent_4668(), new BEvent_4668()); + await session.SaveChangesAsync(); + } + + // Rebuild tenant A's projection by Guid + tenant id. + await store.Advanced.RebuildSingleStreamAsync(streamA, "tenantA"); + + await using (var query = store.QuerySession("tenantA")) + { + var docA = await query.LoadAsync(streamA); + docA.ShouldNotBeNull("tenant A's projection must materialize for the rebuilt stream"); + docA.ACount.ShouldBe(3, "tenant A's AEvents must roll up into ACount"); + docA.BCount.ShouldBe(0, "tenant A's stream has no BEvents"); + } + + // Now rebuild tenant B; the new overload must route the AggregateStream + // read AND the upsert to tenant B's row, leaving tenant A's doc untouched. + await store.Advanced.RebuildSingleStreamAsync(streamB, "tenantB"); + + await using (var query = store.QuerySession("tenantB")) + { + var docB = await query.LoadAsync(streamB); + docB.ShouldNotBeNull("tenant B's projection must materialize for the rebuilt stream"); + docB.ACount.ShouldBe(0, "tenant B's stream has no AEvents"); + docB.BCount.ShouldBe(2, "tenant B's BEvents must roll up into BCount"); + } + + // Cross-tenant isolation pin: loading tenant B's stream id from tenant A + // (or vice versa) returns null — confirms the tenant scope held through + // the rebuild. + await using (var query = store.QuerySession("tenantA")) + { + var crossB = await query.LoadAsync(streamB); + crossB.ShouldBeNull("tenant B's projection must NOT bleed into tenant A's view"); + } + } + + [Fact] + public async Task conjoined_rebuild_by_string_key_with_tenant_id() + { + using var store = DocumentStore.For(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "rebuild_4668_conj_str"; + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Policies.AllDocumentsAreMultiTenanted(); + }); + await store.Advanced.Clean.CompletelyRemoveAllAsync(); + + var keyA = "stream-A-" + Guid.NewGuid(); + var keyB = "stream-B-" + Guid.NewGuid(); + + await using (var session = store.LightweightSession("tenantA")) + { + session.Events.StartStream(keyA, + new AEvent_4668(), new AEvent_4668(), new AEvent_4668(), new AEvent_4668()); + await session.SaveChangesAsync(); + } + + await using (var session = store.LightweightSession("tenantB")) + { + session.Events.StartStream(keyB, + new BEvent_4668()); + await session.SaveChangesAsync(); + } + + await store.Advanced.RebuildSingleStreamAsync(keyA, "tenantA"); + + await using (var query = store.QuerySession("tenantA")) + { + var docA = await query.LoadAsync(keyA); + docA.ShouldNotBeNull(); + docA.ACount.ShouldBe(4); + docA.BCount.ShouldBe(0); + } + + await store.Advanced.RebuildSingleStreamAsync(keyB, "tenantB"); + + await using (var query = store.QuerySession("tenantB")) + { + var docB = await query.LoadAsync(keyB); + docB.ShouldNotBeNull(); + docB.ACount.ShouldBe(0); + docB.BCount.ShouldBe(1); + } + } + + // ---- Per-database multi-tenancy --------------------------------------- + // + // MultiTenantedWithSingleServer puts each tenant in its own physical + // database. RebuildSingleStreamAsync's session has to route to the + // right per-tenant database for both the event load and the upsert. + + [Fact] + public async Task per_database_rebuild_by_guid_with_tenant_id() + { + using var host = await Host.CreateDefaultBuilder() + .ConfigureServices(s => + { + s.AddMarten(opts => + { + opts.MultiTenantedWithSingleServer( + ConnectionSource.ConnectionString, + t => t.WithTenants("rebuild4668_g_tenantA", "rebuild4668_g_tenantB")); + + opts.DatabaseSchemaName = "rebuild_4668_perdb_guid"; + }).ApplyAllDatabaseChangesOnStartup(); + }).StartAsync(); + + var store = host.Services.GetRequiredService(); + + // Each tenant gets its own DB. Reset both so prior runs don't leak. + await store.Advanced.ResetAllData(); + + var streamA = Guid.NewGuid(); + var streamB = Guid.NewGuid(); + + await using (var session = store.LightweightSession("rebuild4668_g_tenantA")) + { + session.Events.StartStream(streamA, + new AEvent_4668(), new AEvent_4668()); + await session.SaveChangesAsync(); + } + + await using (var session = store.LightweightSession("rebuild4668_g_tenantB")) + { + session.Events.StartStream(streamB, + new BEvent_4668(), new BEvent_4668(), new BEvent_4668()); + await session.SaveChangesAsync(); + } + + await store.Advanced.RebuildSingleStreamAsync( + streamA, "rebuild4668_g_tenantA"); + + await using (var query = store.QuerySession("rebuild4668_g_tenantA")) + { + var doc = await query.LoadAsync(streamA); + doc.ShouldNotBeNull("rebuild must persist into tenant A's database"); + doc.ACount.ShouldBe(2); + } + + await store.Advanced.RebuildSingleStreamAsync( + streamB, "rebuild4668_g_tenantB"); + + await using (var query = store.QuerySession("rebuild4668_g_tenantB")) + { + var doc = await query.LoadAsync(streamB); + doc.ShouldNotBeNull("rebuild must persist into tenant B's database"); + doc.BCount.ShouldBe(3); + } + + await host.StopAsync(); + } + + [Fact] + public async Task per_database_rebuild_by_string_key_with_tenant_id() + { + using var host = await Host.CreateDefaultBuilder() + .ConfigureServices(s => + { + s.AddMarten(opts => + { + opts.MultiTenantedWithSingleServer( + ConnectionSource.ConnectionString, + t => t.WithTenants("rebuild4668_s_tenantA", "rebuild4668_s_tenantB")); + + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.DatabaseSchemaName = "rebuild_4668_perdb_str"; + }).ApplyAllDatabaseChangesOnStartup(); + }).StartAsync(); + + var store = host.Services.GetRequiredService(); + await store.Advanced.ResetAllData(); + + var keyA = "stream-A-" + Guid.NewGuid(); + var keyB = "stream-B-" + Guid.NewGuid(); + + await using (var session = store.LightweightSession("rebuild4668_s_tenantA")) + { + session.Events.StartStream(keyA, + new AEvent_4668(), new AEvent_4668(), new AEvent_4668()); + await session.SaveChangesAsync(); + } + + await using (var session = store.LightweightSession("rebuild4668_s_tenantB")) + { + session.Events.StartStream(keyB, + new BEvent_4668(), new BEvent_4668()); + await session.SaveChangesAsync(); + } + + await store.Advanced.RebuildSingleStreamAsync( + keyA, "rebuild4668_s_tenantA"); + + await using (var query = store.QuerySession("rebuild4668_s_tenantA")) + { + var doc = await query.LoadAsync(keyA); + doc.ShouldNotBeNull("rebuild must persist into tenant A's database"); + doc.ACount.ShouldBe(3); + } + + await store.Advanced.RebuildSingleStreamAsync( + keyB, "rebuild4668_s_tenantB"); + + await using (var query = store.QuerySession("rebuild4668_s_tenantB")) + { + var doc = await query.LoadAsync(keyB); + doc.ShouldNotBeNull("rebuild must persist into tenant B's database"); + doc.BCount.ShouldBe(2); + } + + await host.StopAsync(); + } +} + +// ---- Test types ---------------------------------------------------------- + +public class Bug4668Aggregate +{ + public Guid Id { get; set; } + public int ACount { get; set; } + public int BCount { get; set; } + + public void Apply(AEvent_4668 _) => ACount++; + public void Apply(BEvent_4668 _) => BCount++; +} + +public class Bug4668KeyedAggregate +{ + public string Id { get; set; } = string.Empty; + public int ACount { get; set; } + public int BCount { get; set; } + + public void Apply(AEvent_4668 _) => ACount++; + public void Apply(BEvent_4668 _) => BCount++; +} + +// Suffixed with "_4668" to avoid colliding with AEvent / BEvent already +// defined elsewhere in the MultiTenancyTests project. +public record AEvent_4668; +public record BEvent_4668;