diff --git a/Directory.Packages.props b/Directory.Packages.props index 7fbe2c279e..9f90370d58 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -13,8 +13,8 @@ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -66,8 +66,8 @@ - - + + diff --git a/src/ContainerScopedProjectionTests/projections_registered_directly_on_IServiceCollection.cs b/src/ContainerScopedProjectionTests/projections_registered_directly_on_IServiceCollection.cs new file mode 100644 index 0000000000..d571d84167 --- /dev/null +++ b/src/ContainerScopedProjectionTests/projections_registered_directly_on_IServiceCollection.cs @@ -0,0 +1,106 @@ +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Events.Projections; +using Marten; +using Marten.Storage; +using Marten.Testing.Harness; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace ContainerScopedProjectionTests; + +/// +/// Regression coverage for GitHub issue #4267: +/// AddProjectionWithServices was only available on the +/// MartenConfigurationExpression / MartenStoreExpression{T} +/// builder chain returned by AddMarten(). In modular-monolith setups +/// individual modules only have — the builder +/// has already been consumed at composition root — so the builder-only API is +/// unusable and forces callers into the undiscoverable +/// TProjection.Register{T}(services, …) JasperFx internal. +/// +/// This suite asserts that the new overloads +/// behave identically to the builder form for both the default store and +/// ancillary stores. +/// +[Collection("ioc")] +public class projections_registered_directly_on_IServiceCollection +{ + [Fact] + public async Task add_projection_with_services_on_IServiceCollection_registers_for_default_store() + { + using var host = await Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services.AddSingleton(); + + // Builder consumed once … + services.AddMarten(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "ioc_4267_default"; + opts.ApplyChangesLockId = opts.ApplyChangesLockId + 4267; + }); + + // … then a separate module/registration adds the projection using + // only IServiceCollection — no builder chain required. + services.AddProjectionWithServices( + ProjectionLifecycle.Inline, + ServiceLifetime.Singleton); + }) + .StartAsync(); + + var store = host.Services.GetRequiredService(); + + await using var session = store.LightweightSession(); + var streamId = session.Events.StartStream( + new ProductRegistered("Ankle Socks", "Socks")).Id; + await session.SaveChangesAsync(); + + var product = await session.LoadAsync(streamId); + product.ShouldNotBeNull(); + product!.Price.ShouldBeGreaterThan(0); // proves the IoC-resolved IPriceLookup ran + product.Name.ShouldBe("Ankle Socks"); + } + + [Fact] + public async Task add_projection_with_services_on_IServiceCollection_registers_for_ancillary_store() + { + using var host = await Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services.AddSingleton(); + + services.AddMartenStore(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "ioc_4267_ancillary"; + opts.ApplyChangesLockId = opts.ApplyChangesLockId + 4268; + }); + + services.AddProjectionWithServices( + ProjectionLifecycle.Inline, + ServiceLifetime.Singleton); + }) + .StartAsync(); + + var store = host.Services.GetRequiredService(); + + await using var session = store.LightweightSession(); + var streamId = session.Events.StartStream( + new ProductRegistered("Dress Socks", "Socks")).Id; + await session.SaveChangesAsync(); + + var product = await session.LoadAsync(streamId); + product.ShouldNotBeNull(); + product!.Name.ShouldBe("Dress Socks"); + + // Projection materialized a table in the ancillary store's schema. + var tables = store.Storage.AllObjects().OfType(); + tables.ShouldContain(x => x.DocumentType == typeof(Product)); + } +} + +public interface IProductsStore : IDocumentStore; diff --git a/src/EventSourcingTests/Bugs/Bug_4268_projection_async_to_inline_migration.cs b/src/EventSourcingTests/Bugs/Bug_4268_projection_async_to_inline_migration.cs new file mode 100644 index 0000000000..ccbd573e27 --- /dev/null +++ b/src/EventSourcingTests/Bugs/Bug_4268_projection_async_to_inline_migration.cs @@ -0,0 +1,149 @@ +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Events; +using JasperFx.Events.Projections; +using JasperFx.MultiTenancy; +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Schema; +using Marten.Storage; +using Marten.Testing.Harness; +using Weasel.Core; +using Xunit; +using Xunit.Abstractions; + +namespace EventSourcingTests.Bugs; + +/// +/// Reproducer attempt for https://github.com/JasperFx/marten/issues/4268. +/// +/// The reporter changed three existing Async multi-stream projections +/// to Inline (also flipping EnableSideEffectsOnInlineProjections = true), +/// then hit a migration failure on the next event append: +/// +/// DDL Execution for 'All Configured Changes' Failed! +/// alter table public.mt_doc_envelope drop constraint pkey_mt_doc_envelope_tenant_id_id CASCADE; +/// alter table public.mt_doc_envelope add CONSTRAINT pkey_mt_doc_envelope_id PRIMARY KEY (id); +/// alter table public.mt_doc_envelope drop column tenant_id; +/// ... +/// ---> 0A000: unique constraint on partitioned table must include all +/// partitioning columns +/// +/// Core mystery: the outgoing migration *drops* the tenant_id column and +/// replaces the composite pkey with a single-column pkey, which means Marten +/// believes the target table's tenancy style is Single — but the *existing* +/// table was built with Conjoined + partitioning, so the DROP fails. +/// +/// This test runs the async → inline flip against a shared schema with +/// conjoined tenancy + archived-stream partitioning + envelope metadata +/// enabled on all documents (matching the reporter's CreateStoreOptions +/// helper) and expects the migration to succeed. If this test hangs or +/// throws, we have a reproducer. If it passes, we likely need more +/// info from the reporter. +/// +public class Bug_4268_projection_async_to_inline_migration : BugIntegrationContext +{ + private readonly ITestOutputHelper _output; + + public Bug_4268_projection_async_to_inline_migration(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task switching_projection_from_async_to_inline_does_not_break_migration() + { + // 1) Build the schema with the projection registered as Async. + StoreOptions(opts => ConfigureStore(opts, ProjectionLifecycle.Async)); + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + // 2) Append an event and commit so the per-document tables (envelope + // doc, product doc) are materialized with the async-shaped DDL. + var streamId = System.Guid.NewGuid(); + await using (var session = theStore.LightweightSession("tenant-a")) + { + session.Events.StartStream(streamId, new Bug4268Registered("Socks")); + await session.SaveChangesAsync(); + } + + // 3) Spin up a SEPARATE store targeting the SAME schema, but with + // the projection flipped to Inline + side-effects-on-inline turned + // on. This mirrors the reporter's deploy: same DB, new code. + var inline = SeparateStore(opts => ConfigureStore(opts, ProjectionLifecycle.Inline)); + + // 4) Force the migration to run over the full model rather than + // just the document types that a single SaveChanges would + // materialize. This mirrors the widest possible surface the + // reporter's running app could hit on the first deploy and is + // where Marten's schema diff would emit the failing + // "drop partitioning column" DDL if the shape we hypothesized + // actually reproduces. + await inline.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + // 5) And a subsequent append still has to succeed. + await using (var session = inline.LightweightSession("tenant-a")) + { + session.Events.StartStream( + System.Guid.NewGuid(), new Bug4268Registered("Hats")); + + // If the reporter's failure reproduces, this throws + // Marten.Exceptions.MartenSchemaException wrapping + // Npgsql.PostgresException 0A000. + await session.SaveChangesAsync(); + } + } + + private static void ConfigureStore(StoreOptions opts, ProjectionLifecycle lifecycle) + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "bug_4268"; + + opts.TenantIdStyle = TenantIdStyle.ForceLowerCase; + + opts.Events.StreamIdentity = StreamIdentity.AsGuid; + opts.Events.TenancyStyle = TenancyStyle.Conjoined; + opts.Events.UseIdentityMapForAggregates = true; + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.UseArchivedStreamPartitioning = true; + opts.Events.EnableSideEffectsOnInlineProjections = true; + opts.Events.MetadataConfig.EnableAll(); + + opts.Advanced.DefaultTenantUsageEnabled = false; + + opts.Policies.AllDocumentsAreMultiTenanted(); + + opts.Policies.ForAllDocuments(m => + { + m.Metadata.CausationId.Enabled = true; + m.Metadata.CorrelationId.Enabled = true; + m.Metadata.Headers.Enabled = true; + m.Metadata.Version.Enabled = true; + }); + + opts.Projections.Add(lifecycle); + } +} + +public record Bug4268Registered(string Name); + +public class Bug4268Product +{ + public System.Guid Id { get; set; } + public string Name { get; set; } = ""; + + public void Apply(Bug4268Registered e) => Name = e.Name; +} + +public class Bug4268ProductProjection : SingleStreamProjection +{ + public Bug4268ProductProjection() + { + } + + public static Bug4268Product Create(Bug4268Registered @event) + => new() { Name = @event.Name }; + + public void Apply(Bug4268Registered @event, Bug4268Product state) + => state.Apply(@event); +} diff --git a/src/Marten/Events/Daemon/Internals/ProjectionBatch.cs b/src/Marten/Events/Daemon/Internals/ProjectionBatch.cs index d4752f7f68..54a27aa7b6 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionBatch.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionBatch.cs @@ -72,6 +72,20 @@ public async Task PublishMessageAsync(object message, string tenantId) await batch.PublishAsync(message, tenantId).ConfigureAwait(false); } + /// + /// Metadata-aware overload matching + /// . + /// Forwards to the underlying implementation with + /// full metadata so downstream consumers (e.g. Wolverine's + /// MartenToWolverineMessageBatch) can map it to their native delivery + /// options (correlation id, causation id, headers, user name). + /// + public async Task PublishMessageAsync(object message, MessageMetadata metadata) + { + var batch = await _batch.CurrentMessageBatch(_session).ConfigureAwait(false); + await batch.PublishAsync(message, metadata).ConfigureAwait(false); + } + public IDocumentOperations SessionForTenant(string tenantId) { if (tenantId.IsEmpty() || tenantId == StorageConstants.DefaultTenantId) diff --git a/src/Marten/MartenServiceCollectionExtensions.cs b/src/Marten/MartenServiceCollectionExtensions.cs index 3c2fed5177..c4041dc6e5 100644 --- a/src/Marten/MartenServiceCollectionExtensions.cs +++ b/src/Marten/MartenServiceCollectionExtensions.cs @@ -938,6 +938,60 @@ public void Configure(IServiceProvider services, StoreOptions options) options.InitialData.Add(_data); } } + + /// + /// Register a projection that depends on application services, against the default Marten store. + /// Equivalent to + /// but usable from any module that only has access to — + /// intended for modular-monolith setups where the AddMarten builder chain is + /// consumed once at composition root. See https://github.com/JasperFx/marten/issues/4267. + /// + /// + /// The projection lifecycle for Marten + /// + /// The IoC lifecycle for the projection instance. Note that the Transient lifetime will still + /// be treated as Scoped. + /// + /// Optional configuration of the projection name, version, event filtering, and async execution + /// The type of projection to register + public static IServiceCollection AddProjectionWithServices( + this IServiceCollection services, + ProjectionLifecycle lifecycle, + ServiceLifetime lifetime = ServiceLifetime.Scoped, + Action? configure = null) + where TProjection : class, IMartenRegistrable + { + TProjection.Register(services, lifecycle, lifetime, configure); + return services; + } + + /// + /// Register a projection that depends on application services, against the ancillary + /// Marten store . Equivalent to + /// + /// but usable from any module that only has access to . + /// See https://github.com/JasperFx/marten/issues/4267. + /// + /// + /// The projection lifecycle for Marten + /// + /// The IoC lifecycle for the projection instance. Note that the Transient lifetime will still + /// be treated as Scoped. + /// + /// Optional configuration of the projection name, version, event filtering, and async execution + /// The type of projection to register + /// The ancillary store interface the projection belongs to + public static IServiceCollection AddProjectionWithServices( + this IServiceCollection services, + ProjectionLifecycle lifecycle, + ServiceLifetime lifetime = ServiceLifetime.Scoped, + Action? configure = null) + where TProjection : class, IMartenRegistrable + where TStore : class, IDocumentStore + { + TProjection.Register(services, lifecycle, lifetime, configure); + return services; + } } public interface IGlobalConfigureMarten: IConfigureMarten;