diff --git a/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs b/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs index 4a85485ea..2f3a55f3f 100644 --- a/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs +++ b/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs @@ -332,6 +332,140 @@ public async Task use_filtered_publishing_subscription() tracked.Executed.MessagesOf>().Count().ShouldBe(6); } + [Fact] + public async Task non_conjoined_store_preserves_legacy_default_tenant_fallthrough() + { + // Companion to carry_default_tenant_id_through_under_conjoined_tenancy: + // for non-conjoined stores the relay must keep falling through to the bus + // context's TenantId, so any setup that relied on the database identifier as + // the message tenant (e.g. per-tenant ancillary stores with a custom + // Database.Identifier) keeps working. + await dropSchema(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Policies.UseDurableLocalQueues(); + + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "subscriptions"; + // Default (single) tenancy on the events store + }).IntegrateWithWolverine() + .UseLightweightSessions() + .PublishEventsToWolverine("Publish", x => + { + x.PublishEvent(); + }); + }).StartAsync(); + + var store = host.DocumentStore(); + + var daemon = await store.BuildProjectionDaemonAsync(); + + await daemon.StartAllAsync(); + + Func writeEvents = async _ => + { + await using var session = store.LightweightSession(); + session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent()); + await session.SaveChangesAsync(); + + await daemon.WaitForNonStaleData(30.Seconds()); + }; + + var tracked = await host + .TrackActivity() + .ExecuteAndWaitAsync(writeEvents); + + var aEnvelopes = tracked.MessageSucceeded.Envelopes() + .Where(x => x.Message is IEvent) + .ToList(); + + aEnvelopes.ShouldNotBeEmpty(); + + // Pre-fix and post-fix behaviour for non-conjoined stores: the relay falls through + // and envelope.TenantId is the bus context value that + // WolverineSubscriptionRunner set from operations.Database.Identifier + // (e.g. "Main" for a single-database Marten store). Crucially it must NOT be the + // default-tenant marker — that would mean the fix had over-applied and dropped the + // database-identifier-as-tenant convention used by some ancillary-store setups. + foreach (var envelope in aEnvelopes) + { + envelope.TenantId.ShouldNotBeNull(); + envelope.TenantId.ShouldNotBe(JasperFx.StorageConstants.DefaultTenantId); + } + } + + [Fact] + public async Task carry_default_tenant_id_through_under_conjoined_tenancy() + { + // Regression: PublishingRelay used to drop DeliveryOptions for default-tenant + // events, letting WolverineSubscriptionRunner's bus.TenantId + // (= operations.Database.Identifier) leak onto the outbound envelope. Under + // conjoined tenancy where the database identifier is not the tenant, that + // misrouted default-tenant events into the wrong tenant context. + await dropSchema(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Policies.UseDurableLocalQueues(); + + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "subscriptions"; + m.Events.TenancyStyle = Marten.Storage.TenancyStyle.Conjoined; + }).IntegrateWithWolverine() + .UseLightweightSessions() + .PublishEventsToWolverine("Publish", x => + { + x.PublishEvent(); + }); + }).StartAsync(); + + var store = host.DocumentStore(); + + var daemon = await store.BuildProjectionDaemonAsync(); + + await daemon.StartAllAsync(); + + Func writeEvents = async _ => + { + // No tenant arg -> events get IEvent.TenantId == StorageConstants.DefaultTenantId + await using var session = store.LightweightSession(); + session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent()); + await session.SaveChangesAsync(); + + await daemon.WaitForNonStaleData(30.Seconds()); + }; + + var tracked = await host + .TrackActivity() + .ExecuteAndWaitAsync(writeEvents); + + var aEnvelopes = tracked.MessageSucceeded.Envelopes() + .Where(x => x.Message is IEvent) + .ToList(); + + aEnvelopes.ShouldNotBeEmpty(); + + foreach (var envelope in aEnvelopes) + { + // Marten's IEvent for a default-tenant event carries TenantId == "*DEFAULT*"; + // the relayed Wolverine envelope must propagate that, not the database identifier. + envelope.TenantId.ShouldBe(JasperFx.StorageConstants.DefaultTenantId); + } + } + [Fact] public async Task carry_the_tenant_id_through_on_the_subscription() { diff --git a/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs b/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs index ee560249b..cfbf745cc 100644 --- a/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs +++ b/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs @@ -94,15 +94,26 @@ public override async Task ProcessEventsAsync(EventRange page, ISubscriptionCont } else { - if (e.TenantId != StorageConstants.DefaultTenantId) + // Under conjoined tenancy the tenant lives in the row, not the database, so + // IEvent.TenantId is the authoritative attribution and must reach the + // outbound envelope verbatim — including StorageConstants.DefaultTenantId for + // default-tenant events. Otherwise WolverineSubscriptionRunner's + // bus.TenantId (= operations.Database.Identifier, a database identity) would + // misroute them. For non-conjoined stores the existing fallthrough is + // preserved so that any setup relying on the database identifier as the + // message tenant keeps working. + var tenancyStyle = ((IDocumentSession)operations).DocumentStore + .Options.Events.TenancyStyle; + + if (e.TenantId != StorageConstants.DefaultTenantId + || tenancyStyle == TenancyStyle.Conjoined) { - await bus.PublishAsync(e, new DeliveryOptions{TenantId = e.TenantId}); + await bus.PublishAsync(e, new DeliveryOptions { TenantId = e.TenantId }); } else { await bus.PublishAsync(e); } - } } }