Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,140 @@ public async Task use_filtered_publishing_subscription()
tracked.Executed.MessagesOf<IEvent<DEvent>>().Count().ShouldBe(6);
}

[Fact]
public async Task non_conjoined_store_preserves_legacy_default_tenant_fallthrough()
{
// Companion to carry_default_tenant_id_through_under_conjoined_tenancy:
// for non-conjoined stores the relay must keep falling through to the bus
// context's TenantId, so any setup that relied on the database identifier as
// the message tenant (e.g. per-tenant ancillary stores with a custom
// Database.Identifier) keeps working.
await dropSchema();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Policies.UseDurableLocalQueues();

opts.Services.AddMarten(m =>
{
m.DisableNpgsqlLogging = true;
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "subscriptions";
// Default (single) tenancy on the events store
}).IntegrateWithWolverine()
.UseLightweightSessions()
.PublishEventsToWolverine("Publish", x =>
{
x.PublishEvent<AEvent>();
});
}).StartAsync();

var store = host.DocumentStore();

var daemon = await store.BuildProjectionDaemonAsync();

await daemon.StartAllAsync();

Func<IMessageContext, Task> writeEvents = async _ =>
{
await using var session = store.LightweightSession();
session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent());
await session.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());
};

var tracked = await host
.TrackActivity()
.ExecuteAndWaitAsync(writeEvents);

var aEnvelopes = tracked.MessageSucceeded.Envelopes()
.Where(x => x.Message is IEvent<AEvent>)
.ToList();

aEnvelopes.ShouldNotBeEmpty();

// Pre-fix and post-fix behaviour for non-conjoined stores: the relay falls through
// and envelope.TenantId is the bus context value that
// WolverineSubscriptionRunner set from operations.Database.Identifier
// (e.g. "Main" for a single-database Marten store). Crucially it must NOT be the
// default-tenant marker — that would mean the fix had over-applied and dropped the
// database-identifier-as-tenant convention used by some ancillary-store setups.
foreach (var envelope in aEnvelopes)
{
envelope.TenantId.ShouldNotBeNull();
envelope.TenantId.ShouldNotBe(JasperFx.StorageConstants.DefaultTenantId);
}
}

[Fact]
public async Task carry_default_tenant_id_through_under_conjoined_tenancy()
{
// Regression: PublishingRelay used to drop DeliveryOptions for default-tenant
// events, letting WolverineSubscriptionRunner's bus.TenantId
// (= operations.Database.Identifier) leak onto the outbound envelope. Under
// conjoined tenancy where the database identifier is not the tenant, that
// misrouted default-tenant events into the wrong tenant context.
await dropSchema();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Policies.UseDurableLocalQueues();

opts.Services.AddMarten(m =>
{
m.DisableNpgsqlLogging = true;
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "subscriptions";
m.Events.TenancyStyle = Marten.Storage.TenancyStyle.Conjoined;
}).IntegrateWithWolverine()
.UseLightweightSessions()
.PublishEventsToWolverine("Publish", x =>
{
x.PublishEvent<AEvent>();
});
}).StartAsync();

var store = host.DocumentStore();

var daemon = await store.BuildProjectionDaemonAsync();

await daemon.StartAllAsync();

Func<IMessageContext, Task> writeEvents = async _ =>
{
// No tenant arg -> events get IEvent.TenantId == StorageConstants.DefaultTenantId
await using var session = store.LightweightSession();
session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent());
await session.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());
};

var tracked = await host
.TrackActivity()
.ExecuteAndWaitAsync(writeEvents);

var aEnvelopes = tracked.MessageSucceeded.Envelopes()
.Where(x => x.Message is IEvent<AEvent>)
.ToList();

aEnvelopes.ShouldNotBeEmpty();

foreach (var envelope in aEnvelopes)
{
// Marten's IEvent for a default-tenant event carries TenantId == "*DEFAULT*";
// the relayed Wolverine envelope must propagate that, not the database identifier.
envelope.TenantId.ShouldBe(JasperFx.StorageConstants.DefaultTenantId);
}
}

[Fact]
public async Task carry_the_tenant_id_through_on_the_subscription()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public static IServiceCollection PublishEventsToWolverine<T>(this IServiceCollec
{
var runtime = sp.GetRequiredService<IWolverineRuntime>();

var relay = new PublishingRelay(subscriptionName);
var relay = new PublishingRelay(subscriptionName, opts.Events.TenancyStyle);
configure?.Invoke(relay);

var subscription = new WolverineSubscriptionRunner(relay, runtime);
Expand Down
35 changes: 25 additions & 10 deletions src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,24 @@ public interface IPublishingRelay
internal class PublishingRelay : BatchSubscription, IPublishingRelay
{
private ImHashMap<Type, IPublisher> _publishers = ImHashMap<Type, IPublisher>.Empty;
private readonly Func<IEvent, IMessageBus, ValueTask> _relay;

public PublishingRelay(string subscriptionName) : base(subscriptionName)
public PublishingRelay(string subscriptionName, TenancyStyle tenancyStyle) : base(subscriptionName)
{
// Bind the per-event publish path once at construction so the hot loop in
// ProcessEventsAsync needs neither a TenancyStyle comparison nor a downcast
// through IDocumentSession.DocumentStore. Under conjoined tenancy
// IEvent<T>.TenantId is the authoritative attribution (the tenant lives in
// the row, not the database) and must reach the outbound envelope verbatim
// — including StorageConstants.DefaultTenantId for default-tenant events,
// otherwise WolverineSubscriptionRunner's bus.TenantId
// (= operations.Database.Identifier) silently misroutes them. For
// non-conjoined stores the legacy fallthrough is preserved so any setup
// relying on the database identifier as the message tenant keeps working.
// See GH-2675.
_relay = tenancyStyle == TenancyStyle.Conjoined
? RelayWithEventTenant
: RelayWithLegacyFallthrough;
}

public void PublishEvent<T>(Func<IEvent<T>, IMessageBus, ValueTask> publish) where T : notnull
Expand Down Expand Up @@ -94,19 +109,19 @@ public override async Task ProcessEventsAsync(EventRange page, ISubscriptionCont
}
else
{
if (e.TenantId != StorageConstants.DefaultTenantId)
{
await bus.PublishAsync(e, new DeliveryOptions{TenantId = e.TenantId});
}
else
{
await bus.PublishAsync(e);
}

await _relay(e, bus);
}
}
}

private static ValueTask RelayWithEventTenant(IEvent e, IMessageBus bus)
=> bus.PublishAsync(e, new DeliveryOptions { TenantId = e.TenantId });

private static ValueTask RelayWithLegacyFallthrough(IEvent e, IMessageBus bus)
=> e.TenantId != StorageConstants.DefaultTenantId
? bus.PublishAsync(e, new DeliveryOptions { TenantId = e.TenantId })
: bus.PublishAsync(e);

internal interface IPublisher
{
ValueTask PublishAsync(object e, IMessageBus bus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ public static IServiceCollection PublishEventsToWolverine(this IServiceCollectio
{
var runtime = sp.GetRequiredService<IWolverineRuntime>();

var relay = new PublishingRelay(subscriptionName);
var relay = new PublishingRelay(subscriptionName, opts.Events.TenancyStyle);
configure?.Invoke(relay);

var subscription = new WolverineSubscriptionRunner(relay, runtime);
Expand Down
Loading