Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions src/Persistence/MartenSubscriptionTests/subscriptions_end_to_end.cs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,140 @@ public async Task use_filtered_publishing_subscription()
tracked.Executed.MessagesOf<IEvent<DEvent>>().Count().ShouldBe(6);
}

[Fact]
public async Task non_conjoined_store_preserves_legacy_default_tenant_fallthrough()
{
// Companion to carry_default_tenant_id_through_under_conjoined_tenancy:
// for non-conjoined stores the relay must keep falling through to the bus
// context's TenantId, so any setup that relied on the database identifier as
// the message tenant (e.g. per-tenant ancillary stores with a custom
// Database.Identifier) keeps working.
await dropSchema();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Policies.UseDurableLocalQueues();

opts.Services.AddMarten(m =>
{
m.DisableNpgsqlLogging = true;
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "subscriptions";
// Default (single) tenancy on the events store
}).IntegrateWithWolverine()
.UseLightweightSessions()
.PublishEventsToWolverine("Publish", x =>
{
x.PublishEvent<AEvent>();
});
}).StartAsync();

var store = host.DocumentStore();

var daemon = await store.BuildProjectionDaemonAsync();

await daemon.StartAllAsync();

Func<IMessageContext, Task> writeEvents = async _ =>
{
await using var session = store.LightweightSession();
session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent());
await session.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());
};

var tracked = await host
.TrackActivity()
.ExecuteAndWaitAsync(writeEvents);

var aEnvelopes = tracked.MessageSucceeded.Envelopes()
.Where(x => x.Message is IEvent<AEvent>)
.ToList();

aEnvelopes.ShouldNotBeEmpty();

// Pre-fix and post-fix behaviour for non-conjoined stores: the relay falls through
// and envelope.TenantId is the bus context value that
// WolverineSubscriptionRunner set from operations.Database.Identifier
// (e.g. "Main" for a single-database Marten store). Crucially it must NOT be the
// default-tenant marker — that would mean the fix had over-applied and dropped the
// database-identifier-as-tenant convention used by some ancillary-store setups.
foreach (var envelope in aEnvelopes)
{
envelope.TenantId.ShouldNotBeNull();
envelope.TenantId.ShouldNotBe(JasperFx.StorageConstants.DefaultTenantId);
}
}

[Fact]
public async Task carry_default_tenant_id_through_under_conjoined_tenancy()
{
// Regression: PublishingRelay used to drop DeliveryOptions for default-tenant
// events, letting WolverineSubscriptionRunner's bus.TenantId
// (= operations.Database.Identifier) leak onto the outbound envelope. Under
// conjoined tenancy where the database identifier is not the tenant, that
// misrouted default-tenant events into the wrong tenant context.
await dropSchema();

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Policies.UseDurableLocalQueues();

opts.Services.AddMarten(m =>
{
m.DisableNpgsqlLogging = true;
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "subscriptions";
m.Events.TenancyStyle = Marten.Storage.TenancyStyle.Conjoined;
}).IntegrateWithWolverine()
.UseLightweightSessions()
.PublishEventsToWolverine("Publish", x =>
{
x.PublishEvent<AEvent>();
});
}).StartAsync();

var store = host.DocumentStore();

var daemon = await store.BuildProjectionDaemonAsync();

await daemon.StartAllAsync();

Func<IMessageContext, Task> writeEvents = async _ =>
{
// No tenant arg -> events get IEvent.TenantId == StorageConstants.DefaultTenantId
await using var session = store.LightweightSession();
session.Events.StartStream(Guid.NewGuid(), new AEvent(), new AEvent());
await session.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());
};

var tracked = await host
.TrackActivity()
.ExecuteAndWaitAsync(writeEvents);

var aEnvelopes = tracked.MessageSucceeded.Envelopes()
.Where(x => x.Message is IEvent<AEvent>)
.ToList();

aEnvelopes.ShouldNotBeEmpty();

foreach (var envelope in aEnvelopes)
{
// Marten's IEvent for a default-tenant event carries TenantId == "*DEFAULT*";
// the relayed Wolverine envelope must propagate that, not the database identifier.
envelope.TenantId.ShouldBe(JasperFx.StorageConstants.DefaultTenantId);
}
}

[Fact]
public async Task carry_the_tenant_id_through_on_the_subscription()
{
Expand Down
17 changes: 14 additions & 3 deletions src/Persistence/Wolverine.Marten/Subscriptions/PublishingRelay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,26 @@ public override async Task ProcessEventsAsync(EventRange page, ISubscriptionCont
}
else
{
if (e.TenantId != StorageConstants.DefaultTenantId)
// Under conjoined tenancy the tenant lives in the row, not the database, so
// IEvent<T>.TenantId is the authoritative attribution and must reach the
// outbound envelope verbatim — including StorageConstants.DefaultTenantId for
// default-tenant events. Otherwise WolverineSubscriptionRunner's
// bus.TenantId (= operations.Database.Identifier, a database identity) would
// misroute them. For non-conjoined stores the existing fallthrough is
// preserved so that any setup relying on the database identifier as the
// message tenant keeps working.
var tenancyStyle = ((IDocumentSession)operations).DocumentStore
.Options.Events.TenancyStyle;

if (e.TenantId != StorageConstants.DefaultTenantId
|| tenancyStyle == TenancyStyle.Conjoined)
{
await bus.PublishAsync(e, new DeliveryOptions{TenantId = e.TenantId});
await bus.PublishAsync(e, new DeliveryOptions { TenantId = e.TenantId });
}
else
{
await bus.PublishAsync(e);
}

}
}
}
Expand Down
Loading