diff --git a/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs b/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs index 4a85485ea..2f3a55f3f 100644 --- a/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs +++ b/src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs @@ -332,6 +332,140 @@ public async Task use_filtered_publishing_subscription() tracked.Executed.MessagesOf>().Count().ShouldBe(6); } + [Fact] + public async Task non_conjoined_store_preserves_legacy_default_tenant_fallthrough() + { + // Companion to carry_default_tenant_id_through_under_conjoined_tenancy: + // for non-conjoined stores the relay must keep falling through to the bus + // context's TenantId, so any setup that relied on the database identifier as + // the message tenant (e.g. per-tenant ancillary stores with a custom + // Database.Identifier) keeps working. + await dropSchema(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Policies.UseDurableLocalQueues(); + + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "subscriptions"; + // Default (single) tenancy on the events store + }).IntegrateWithWolverine() + .UseLightweightSessions() + .PublishEventsToWolverine("Publish", x => + { + x.PublishEvent(); + }); + }).StartAsync(); + + var store = host.DocumentStore(); + + var daemon = await store.BuildProjectionDaemonAsync(); + + await daemon.StartAllAsync(); + + Func writeEvents = async _ => + { + await using var session = store.LightweightSession(); + session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent()); + await session.SaveChangesAsync(); + + await daemon.WaitForNonStaleData(30.Seconds()); + }; + + var tracked = await host + .TrackActivity() + .ExecuteAndWaitAsync(writeEvents); + + var aEnvelopes = tracked.MessageSucceeded.Envelopes() + .Where(x => x.Message is IEvent) + .ToList(); + + aEnvelopes.ShouldNotBeEmpty(); + + // Pre-fix and post-fix behaviour for non-conjoined stores: the relay falls through + // and envelope.TenantId is the bus context value that + // WolverineSubscriptionRunner set from operations.Database.Identifier + // (e.g. "Main" for a single-database Marten store). Crucially it must NOT be the + // default-tenant marker — that would mean the fix had over-applied and dropped the + // database-identifier-as-tenant convention used by some ancillary-store setups. + foreach (var envelope in aEnvelopes) + { + envelope.TenantId.ShouldNotBeNull(); + envelope.TenantId.ShouldNotBe(JasperFx.StorageConstants.DefaultTenantId); + } + } + + [Fact] + public async Task carry_default_tenant_id_through_under_conjoined_tenancy() + { + // Regression: PublishingRelay used to drop DeliveryOptions for default-tenant + // events, letting WolverineSubscriptionRunner's bus.TenantId + // (= operations.Database.Identifier) leak onto the outbound envelope. Under + // conjoined tenancy where the database identifier is not the tenant, that + // misrouted default-tenant events into the wrong tenant context. + await dropSchema(); + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Policies.UseDurableLocalQueues(); + + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "subscriptions"; + m.Events.TenancyStyle = Marten.Storage.TenancyStyle.Conjoined; + }).IntegrateWithWolverine() + .UseLightweightSessions() + .PublishEventsToWolverine("Publish", x => + { + x.PublishEvent(); + }); + }).StartAsync(); + + var store = host.DocumentStore(); + + var daemon = await store.BuildProjectionDaemonAsync(); + + await daemon.StartAllAsync(); + + Func writeEvents = async _ => + { + // No tenant arg -> events get IEvent.TenantId == StorageConstants.DefaultTenantId + await using var session = store.LightweightSession(); + session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent()); + await session.SaveChangesAsync(); + + await daemon.WaitForNonStaleData(30.Seconds()); + }; + + var tracked = await host + .TrackActivity() + .ExecuteAndWaitAsync(writeEvents); + + var aEnvelopes = tracked.MessageSucceeded.Envelopes() + .Where(x => x.Message is IEvent) + .ToList(); + + aEnvelopes.ShouldNotBeEmpty(); + + foreach (var envelope in aEnvelopes) + { + // Marten's IEvent for a default-tenant event carries TenantId == "*DEFAULT*"; + // the relayed Wolverine envelope must propagate that, not the database identifier. + envelope.TenantId.ShouldBe(JasperFx.StorageConstants.DefaultTenantId); + } + } + [Fact] public async Task carry_the_tenant_id_through_on_the_subscription() { diff --git a/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs b/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs index 71c564774..10a5a1b87 100644 --- a/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs +++ b/src/Persistence/Wolverine.Marten/AncillaryWolverineOptionsMartenExtensions.cs @@ -357,7 +357,7 @@ public static IServiceCollection PublishEventsToWolverine(this IServiceCollec { var runtime = sp.GetRequiredService(); - var relay = new PublishingRelay(subscriptionName); + var relay = new PublishingRelay(subscriptionName, opts.Events.TenancyStyle); configure?.Invoke(relay); var subscription = new WolverineSubscriptionRunner(relay, runtime); diff --git a/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs b/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs index ee560249b..64a76f2af 100644 --- a/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs +++ b/src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs @@ -60,9 +60,24 @@ public interface IPublishingRelay internal class PublishingRelay : BatchSubscription, IPublishingRelay { private ImHashMap _publishers = ImHashMap.Empty; + private readonly Func _relay; - public PublishingRelay(string subscriptionName) : base(subscriptionName) + public PublishingRelay(string subscriptionName, TenancyStyle tenancyStyle) : base(subscriptionName) { + // Bind the per-event publish path once at construction so the hot loop in + // ProcessEventsAsync needs neither a TenancyStyle comparison nor a downcast + // through IDocumentSession.DocumentStore. Under conjoined tenancy + // IEvent.TenantId is the authoritative attribution (the tenant lives in + // the row, not the database) and must reach the outbound envelope verbatim + // — including StorageConstants.DefaultTenantId for default-tenant events, + // otherwise WolverineSubscriptionRunner's bus.TenantId + // (= operations.Database.Identifier) silently misroutes them. For + // non-conjoined stores the legacy fallthrough is preserved so any setup + // relying on the database identifier as the message tenant keeps working. + // See GH-2675. + _relay = tenancyStyle == TenancyStyle.Conjoined + ? RelayWithEventTenant + : RelayWithLegacyFallthrough; } public void PublishEvent(Func, IMessageBus, ValueTask> publish) where T : notnull @@ -94,19 +109,19 @@ public override async Task ProcessEventsAsync(EventRange page, ISubscriptionCont } else { - if (e.TenantId != StorageConstants.DefaultTenantId) - { - await bus.PublishAsync(e, new DeliveryOptions{TenantId = e.TenantId}); - } - else - { - await bus.PublishAsync(e); - } - + await _relay(e, bus); } } } + private static ValueTask RelayWithEventTenant(IEvent e, IMessageBus bus) + => bus.PublishAsync(e, new DeliveryOptions { TenantId = e.TenantId }); + + private static ValueTask RelayWithLegacyFallthrough(IEvent e, IMessageBus bus) + => e.TenantId != StorageConstants.DefaultTenantId + ? bus.PublishAsync(e, new DeliveryOptions { TenantId = e.TenantId }) + : bus.PublishAsync(e); + internal interface IPublisher { ValueTask PublishAsync(object e, IMessageBus bus); diff --git a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs index d86e1c5fe..f132e7116 100644 --- a/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs +++ b/src/Persistence/Wolverine.Marten/WolverineOptionsMartenExtensions.cs @@ -462,7 +462,7 @@ public static IServiceCollection PublishEventsToWolverine(this IServiceCollectio { var runtime = sp.GetRequiredService(); - var relay = new PublishingRelay(subscriptionName); + var relay = new PublishingRelay(subscriptionName, opts.Events.TenancyStyle); configure?.Invoke(relay); var subscription = new WolverineSubscriptionRunner(relay, runtime);