diff --git a/src/Persistence/MartenTests/Bugs/Bug_4268_inline_side_effects_should_not_unpartition_envelope.cs b/src/Persistence/MartenTests/Bugs/Bug_4268_inline_side_effects_should_not_unpartition_envelope.cs new file mode 100644 index 000000000..286ae0d57 --- /dev/null +++ b/src/Persistence/MartenTests/Bugs/Bug_4268_inline_side_effects_should_not_unpartition_envelope.cs @@ -0,0 +1,247 @@ +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Storage; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Npgsql; +using Shouldly; +using Weasel.Postgresql; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.Bugs; + +public class Bug_4268_inline_side_effects_should_not_unpartition_envelope +{ + private const string SchemaName = "bug4268"; + + [Fact] + public async Task inline_projection_side_effects_should_not_try_to_remove_tenant_id_from_envelope_storage() + { + await DropSchemasAsync(); + + // Step 1: build the schema under async projections. Even though the + // ancillary store applies AllDocumentsAreMultiTenantedWithPartitioning + // to every document, Wolverine's Envelope outbox table must be exempt — + // otherwise two stores sharing a schema drift apart on its shape and + // the next schema diff emits an impossible "drop partitioning column" + // migration. See GH-2566 / marten#4268. + await BuildOriginalAsyncProjectionStorageAsync(); + (await EnvelopeStorageIsTenantPartitionedAsync()).ShouldBeFalse( + "Envelope storage should stay single-tenant / unpartitioned regardless of the store's blanket AllDocumentsAreMultiTenantedWithPartitioning policy"); + + // Step 2: flip to inline projections + enable side effects. Without the + // fix this threw Marten.Exceptions.MartenSchemaException wrapping + // "unique constraint on partitioned table must include all partitioning + // columns" on the emitted "alter table ... drop column tenant_id" DDL. + var exception = await Record.ExceptionAsync(TriggerInlineProjectionSideEffectAsync); + + if (exception is not null) + { + exception.ToString().ShouldNotContain("drop column tenant_id"); + } + + exception.ShouldBeNull(); + } + + private static async Task BuildOriginalAsyncProjectionStorageAsync() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.Durability.EnableInboxPartitioning = true; + + ConfigureMainStore(opts, enableInlineSideEffects: false); + + opts.Services.AddMartenStore(_ => + { + var m = new StoreOptions(); + ConfigureAncillaryStore(m); + return m; + }) + .AddProjectionWithServices(ProjectionLifecycle.Async, ServiceLifetime.Singleton) + .IntegrateWithWolverine() + .AddAsyncDaemon(DaemonMode.Solo); + + opts.Policies.UseDurableLocalQueues(); + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(Bug4268SideEffectHandler)); + }).StartAsync(); + + var streamId = Guid.NewGuid(); + var store = host.Services.GetRequiredService(); + + await host.TrackActivity() + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync((Func)(async _ => + { + await using var session = store.LightweightSession("tenant1"); + session.Events.StartStream(streamId, new Bug4268Started()); + await session.SaveChangesAsync(); + })); + } + + private static async Task TriggerInlineProjectionSideEffectAsync() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.Durability.EnableInboxPartitioning = true; + + ConfigureMainStore(opts, enableInlineSideEffects: true); + + opts.Services.AddMartenStore(_ => + { + var m = new StoreOptions(); + ConfigureAncillaryStore(m); + m.Events.EnableSideEffectsOnInlineProjections = true; + return m; + }) + .AddProjectionWithServices(ProjectionLifecycle.Inline, ServiceLifetime.Singleton) + .IntegrateWithWolverine(); + + opts.Policies.UseDurableLocalQueues(); + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(Bug4268SideEffectHandler)); + }).StartAsync(); + + var store = host.Services.GetRequiredService(); + await using var session = store.LightweightSession("tenant1"); + session.Events.StartStream(Guid.NewGuid(), new Bug4268MainStarted()); + await session.SaveChangesAsync(); + } + + private static void ConfigureMainStore(WolverineOptions opts, bool enableInlineSideEffects) + { + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = SchemaName; + m.Events.DatabaseSchemaName = SchemaName; + m.Events.TenancyStyle = TenancyStyle.Conjoined; + m.Advanced.DefaultTenantUsageEnabled = false; + m.Schema.For().MultiTenanted(); + + if (enableInlineSideEffects) + { + m.Events.EnableSideEffectsOnInlineProjections = true; + m.Projections.Add(ProjectionLifecycle.Inline); + } + + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + } + + private static void ConfigureAncillaryStore(StoreOptions m) + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = SchemaName; + m.Events.DatabaseSchemaName = SchemaName; + m.Events.TenancyStyle = TenancyStyle.Conjoined; + m.Advanced.DefaultTenantUsageEnabled = false; + + // Do not configure Envelope directly. The existing envelope storage shape + // comes from the ancillary store's normal multi-tenanted document policy. + m.Policies.AllDocumentsAreMultiTenantedWithPartitioning(x => + { + x.ByHash("one", "two"); + }); + m.DisableNpgsqlLogging = true; + } + + private static async Task EnvelopeStorageIsTenantPartitionedAsync() + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + select c.relkind = 'p' + from pg_class c + join pg_namespace n on n.oid = c.relnamespace + join information_schema.columns col on col.table_schema = n.nspname and col.table_name = c.relname + where n.nspname = @schema + and c.relname = 'mt_doc_envelope' + and col.column_name = 'tenant_id' + """; + cmd.Parameters.AddWithValue("schema", SchemaName); + + return await cmd.ExecuteScalarAsync() as bool? == true; + } + + private static async Task DropSchemasAsync() + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync(SchemaName); + } +} + +public interface IBug4268Store : IDocumentStore; + +public record Bug4268Started; + +public record Bug4268MainStarted; + +public record Bug4268SideEffect(Guid StreamId); + +public class Bug4268Aggregate +{ + public Guid Id { get; set; } + + public static Bug4268Aggregate Create(Bug4268Started _) => new(); +} + +public class Bug4268MainAggregate +{ + public Guid Id { get; set; } + + public static Bug4268MainAggregate Create(Bug4268MainStarted _) => new(); +} + +public class Bug4268Projection : SingleStreamProjection +{ + public static Bug4268Aggregate Create(Bug4268Started _) => new(); + + public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice slice) + { + if (slice.Snapshot is not null) + { + slice.PublishMessage(new Bug4268SideEffect(slice.Snapshot.Id)); + } + + return ValueTask.CompletedTask; + } +} + +public class Bug4268MainProjection : SingleStreamProjection +{ + public static Bug4268MainAggregate Create(Bug4268MainStarted _) => new(); + + public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice slice) + { + if (slice.Snapshot is not null) + { + slice.PublishMessage(new Bug4268SideEffect(slice.Snapshot.Id)); + } + + return ValueTask.CompletedTask; + } +} + +public static class Bug4268SideEffectHandler +{ + public static void Handle(Bug4268SideEffect _) + { + } +} diff --git a/src/Persistence/Wolverine.Marten/MartenIntegration.cs b/src/Persistence/Wolverine.Marten/MartenIntegration.cs index c6572dc8f..41d4f9d88 100644 --- a/src/Persistence/Wolverine.Marten/MartenIntegration.cs +++ b/src/Persistence/Wolverine.Marten/MartenIntegration.cs @@ -7,6 +7,7 @@ using Marten.Exceptions; using Marten.Internal; using Marten.Schema; +using Marten.Storage; using Microsoft.Extensions.DependencyInjection; using Npgsql; using Weasel.Core; @@ -150,7 +151,23 @@ internal class MartenOverrides : IConfigureMarten public void Configure(IServiceProvider services, StoreOptions options) { options.Events.MessageOutbox = new MartenToWolverineOutbox(services); - + + // Envelope is Wolverine's operational outbox document. Keep it + // single-tenant and unpartitioned regardless of blanket document + // policies the user has applied (AllDocumentsAreMultiTenanted or + // AllDocumentsAreMultiTenantedWithPartitioning). Without this, + // two stores that share a database schema can disagree about + // mt_doc_envelope's shape, producing an impossible + // "drop partitioning column" migration on the next deploy. + // + // These per-type alterations on the DocumentMappingBuilder run + // AFTER Marten's applyPolicies / applyPostPolicies passes during + // DocumentMapping construction, so they reliably win over any + // blanket policy the user registered. See GH-2566 / marten#4268. + options.Schema.For() + .SingleTenanted() + .DoNotPartition(); + options.Policies.ForAllDocuments(mapping => { if (mapping.DocumentType.CanBeCastTo())