Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="FSharp.Core" Version="9.0.100" />
<PackageVersion Include="FSharp.SystemTextJson" Version="1.3.13" />
<PackageVersion Include="JasperFx" Version="1.24.1" />
<PackageVersion Include="JasperFx.Events" Version="1.28.1" />
<PackageVersion Include="JasperFx" Version="1.26.0" />
<PackageVersion Include="JasperFx.Events" Version="1.29.0" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="1.5.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down Expand Up @@ -66,8 +66,8 @@
<PackageVersion Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageVersion Include="System.IO.Hashing" Version="10.0.3" />
<PackageVersion Include="Vogen" Version="7.0.0" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.12.1" />
<PackageVersion Include="Weasel.Postgresql" Version="8.12.1" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.14.1" />
<PackageVersion Include="Weasel.Postgresql" Version="8.14.1" />
<PackageVersion Include="WolverineFx.Marten" Version="4.2.0" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.5" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Events.Projections;
using Marten;
using Marten.Storage;
using Marten.Testing.Harness;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Xunit;

namespace ContainerScopedProjectionTests;

/// <summary>
/// Regression coverage for GitHub issue #4267:
/// <c>AddProjectionWithServices</c> was only available on the
/// <c>MartenConfigurationExpression</c> / <c>MartenStoreExpression{T}</c>
/// builder chain returned by <c>AddMarten()</c>. In modular-monolith setups
/// individual modules only have <see cref="IServiceCollection"/> — the builder
/// has already been consumed at composition root — so the builder-only API is
/// unusable and forces callers into the undiscoverable
/// <c>TProjection.Register{T}(services, …)</c> JasperFx internal.
///
/// This suite asserts that the new <see cref="IServiceCollection"/> overloads
/// behave identically to the builder form for both the default store and
/// ancillary stores.
/// </summary>
[Collection("ioc")]
public class projections_registered_directly_on_IServiceCollection
{
[Fact]
public async Task add_projection_with_services_on_IServiceCollection_registers_for_default_store()
{
using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<IPriceLookup, PriceLookup>();

// Builder consumed once …
services.AddMarten(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc_4267_default";
opts.ApplyChangesLockId = opts.ApplyChangesLockId + 4267;
});

// … then a separate module/registration adds the projection using
// only IServiceCollection — no builder chain required.
services.AddProjectionWithServices<ProductProjection>(
ProjectionLifecycle.Inline,
ServiceLifetime.Singleton);
})
.StartAsync();

var store = host.Services.GetRequiredService<IDocumentStore>();

await using var session = store.LightweightSession();
var streamId = session.Events.StartStream<Product>(
new ProductRegistered("Ankle Socks", "Socks")).Id;
await session.SaveChangesAsync();

var product = await session.LoadAsync<Product>(streamId);
product.ShouldNotBeNull();
product!.Price.ShouldBeGreaterThan(0); // proves the IoC-resolved IPriceLookup ran
product.Name.ShouldBe("Ankle Socks");
}

[Fact]
public async Task add_projection_with_services_on_IServiceCollection_registers_for_ancillary_store()
{
using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<IPriceLookup, PriceLookup>();

services.AddMartenStore<IProductsStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ioc_4267_ancillary";
opts.ApplyChangesLockId = opts.ApplyChangesLockId + 4268;
});

services.AddProjectionWithServices<ProductProjection, IProductsStore>(
ProjectionLifecycle.Inline,
ServiceLifetime.Singleton);
})
.StartAsync();

var store = host.Services.GetRequiredService<IProductsStore>();

await using var session = store.LightweightSession();
var streamId = session.Events.StartStream<Product>(
new ProductRegistered("Dress Socks", "Socks")).Id;
await session.SaveChangesAsync();

var product = await session.LoadAsync<Product>(streamId);
product.ShouldNotBeNull();
product!.Name.ShouldBe("Dress Socks");

// Projection materialized a table in the ancillary store's schema.
var tables = store.Storage.AllObjects().OfType<DocumentTable>();
tables.ShouldContain(x => x.DocumentType == typeof(Product));
}
}

public interface IProductsStore : IDocumentStore;
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Events;
using JasperFx.Events.Projections;
using JasperFx.MultiTenancy;
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Schema;
using Marten.Storage;
using Marten.Testing.Harness;
using Weasel.Core;
using Xunit;
using Xunit.Abstractions;

namespace EventSourcingTests.Bugs;

/// <summary>
/// Reproducer attempt for https://github.com/JasperFx/marten/issues/4268.
///
/// The reporter changed three existing <c>Async</c> multi-stream projections
/// to <c>Inline</c> (also flipping <c>EnableSideEffectsOnInlineProjections = true</c>),
/// then hit a migration failure on the next event append:
///
/// DDL Execution for 'All Configured Changes' Failed!
/// alter table public.mt_doc_envelope drop constraint pkey_mt_doc_envelope_tenant_id_id CASCADE;
/// alter table public.mt_doc_envelope add CONSTRAINT pkey_mt_doc_envelope_id PRIMARY KEY (id);
/// alter table public.mt_doc_envelope drop column tenant_id;
/// ...
/// ---> 0A000: unique constraint on partitioned table must include all
/// partitioning columns
///
/// Core mystery: the outgoing migration *drops* the tenant_id column and
/// replaces the composite pkey with a single-column pkey, which means Marten
/// believes the target table's tenancy style is Single — but the *existing*
/// table was built with Conjoined + partitioning, so the DROP fails.
///
/// This test runs the async → inline flip against a shared schema with
/// conjoined tenancy + archived-stream partitioning + envelope metadata
/// enabled on all documents (matching the reporter's CreateStoreOptions
/// helper) and expects the migration to succeed. If this test hangs or
/// throws, we have a reproducer. If it passes, we likely need more
/// info from the reporter.
/// </summary>
public class Bug_4268_projection_async_to_inline_migration : BugIntegrationContext
{
private readonly ITestOutputHelper _output;

public Bug_4268_projection_async_to_inline_migration(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task switching_projection_from_async_to_inline_does_not_break_migration()
{
// 1) Build the schema with the projection registered as Async.
StoreOptions(opts => ConfigureStore(opts, ProjectionLifecycle.Async));
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// 2) Append an event and commit so the per-document tables (envelope
// doc, product doc) are materialized with the async-shaped DDL.
var streamId = System.Guid.NewGuid();
await using (var session = theStore.LightweightSession("tenant-a"))
{
session.Events.StartStream<Bug4268Product>(streamId, new Bug4268Registered("Socks"));
await session.SaveChangesAsync();
}

// 3) Spin up a SEPARATE store targeting the SAME schema, but with
// the projection flipped to Inline + side-effects-on-inline turned
// on. This mirrors the reporter's deploy: same DB, new code.
var inline = SeparateStore(opts => ConfigureStore(opts, ProjectionLifecycle.Inline));

// 4) Force the migration to run over the full model rather than
// just the document types that a single SaveChanges would
// materialize. This mirrors the widest possible surface the
// reporter's running app could hit on the first deploy and is
// where Marten's schema diff would emit the failing
// "drop partitioning column" DDL if the shape we hypothesized
// actually reproduces.
await inline.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// 5) And a subsequent append still has to succeed.
await using (var session = inline.LightweightSession("tenant-a"))
{
session.Events.StartStream<Bug4268Product>(
System.Guid.NewGuid(), new Bug4268Registered("Hats"));

// If the reporter's failure reproduces, this throws
// Marten.Exceptions.MartenSchemaException wrapping
// Npgsql.PostgresException 0A000.
await session.SaveChangesAsync();
}
}

private static void ConfigureStore(StoreOptions opts, ProjectionLifecycle lifecycle)
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "bug_4268";

opts.TenantIdStyle = TenantIdStyle.ForceLowerCase;

opts.Events.StreamIdentity = StreamIdentity.AsGuid;
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
opts.Events.UseIdentityMapForAggregates = true;
opts.Events.AppendMode = EventAppendMode.Quick;
opts.Events.UseArchivedStreamPartitioning = true;
opts.Events.EnableSideEffectsOnInlineProjections = true;
opts.Events.MetadataConfig.EnableAll();

opts.Advanced.DefaultTenantUsageEnabled = false;

opts.Policies.AllDocumentsAreMultiTenanted();

opts.Policies.ForAllDocuments(m =>
{
m.Metadata.CausationId.Enabled = true;
m.Metadata.CorrelationId.Enabled = true;
m.Metadata.Headers.Enabled = true;
m.Metadata.Version.Enabled = true;
});

opts.Projections.Add<Bug4268ProductProjection>(lifecycle);
}
}

public record Bug4268Registered(string Name);

public class Bug4268Product
{
public System.Guid Id { get; set; }
public string Name { get; set; } = "";

public void Apply(Bug4268Registered e) => Name = e.Name;
}

public class Bug4268ProductProjection : SingleStreamProjection<Bug4268Product, System.Guid>
{
public Bug4268ProductProjection()
{
}

public static Bug4268Product Create(Bug4268Registered @event)
=> new() { Name = @event.Name };

public void Apply(Bug4268Registered @event, Bug4268Product state)
=> state.Apply(@event);
}
14 changes: 14 additions & 0 deletions src/Marten/Events/Daemon/Internals/ProjectionBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ public async Task PublishMessageAsync(object message, string tenantId)
await batch.PublishAsync(message, tenantId).ConfigureAwait(false);
}

/// <summary>
/// Metadata-aware overload matching
/// <see cref="IProjectionBatch.PublishMessageAsync(object, MessageMetadata)"/>.
/// Forwards to the underlying <see cref="IMessageSink"/> implementation with
/// full metadata so downstream consumers (e.g. Wolverine's
/// MartenToWolverineMessageBatch) can map it to their native delivery
/// options (correlation id, causation id, headers, user name).
/// </summary>
public async Task PublishMessageAsync(object message, MessageMetadata metadata)
{
var batch = await _batch.CurrentMessageBatch(_session).ConfigureAwait(false);
await batch.PublishAsync(message, metadata).ConfigureAwait(false);
}

public IDocumentOperations SessionForTenant(string tenantId)
{
if (tenantId.IsEmpty() || tenantId == StorageConstants.DefaultTenantId)
Expand Down
54 changes: 54 additions & 0 deletions src/Marten/MartenServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,60 @@ public void Configure(IServiceProvider services, StoreOptions options)
options.InitialData.Add(_data);
}
}

/// <summary>
/// Register a projection that depends on application services, against the default Marten store.
/// Equivalent to <see cref="MartenConfigurationExpression.AddProjectionWithServices{T}(ProjectionLifecycle, ServiceLifetime, Action{ProjectionBase})"/>
/// but usable from any module that only has access to <see cref="IServiceCollection"/> —
/// intended for modular-monolith setups where the <c>AddMarten</c> builder chain is
/// consumed once at composition root. See https://github.com/JasperFx/marten/issues/4267.
/// </summary>
/// <param name="services"></param>
/// <param name="lifecycle">The projection lifecycle for Marten</param>
/// <param name="lifetime">
/// The IoC lifecycle for the projection instance. Note that the Transient lifetime will still
/// be treated as Scoped.
/// </param>
/// <param name="configure">Optional configuration of the projection name, version, event filtering, and async execution</param>
/// <typeparam name="TProjection">The type of projection to register</typeparam>
public static IServiceCollection AddProjectionWithServices<TProjection>(
this IServiceCollection services,
ProjectionLifecycle lifecycle,
ServiceLifetime lifetime = ServiceLifetime.Scoped,
Action<ProjectionBase>? configure = null)
where TProjection : class, IMartenRegistrable
{
TProjection.Register<TProjection>(services, lifecycle, lifetime, configure);
return services;
}

/// <summary>
/// Register a projection that depends on application services, against the ancillary
/// Marten store <typeparamref name="TStore"/>. Equivalent to
/// <see cref="MartenStoreExpression{T}.AddProjectionWithServices{TProjection}(ProjectionLifecycle, ServiceLifetime, Action{ProjectionBase})"/>
/// but usable from any module that only has access to <see cref="IServiceCollection"/>.
/// See https://github.com/JasperFx/marten/issues/4267.
/// </summary>
/// <param name="services"></param>
/// <param name="lifecycle">The projection lifecycle for Marten</param>
/// <param name="lifetime">
/// The IoC lifecycle for the projection instance. Note that the Transient lifetime will still
/// be treated as Scoped.
/// </param>
/// <param name="configure">Optional configuration of the projection name, version, event filtering, and async execution</param>
/// <typeparam name="TProjection">The type of projection to register</typeparam>
/// <typeparam name="TStore">The ancillary store interface the projection belongs to</typeparam>
public static IServiceCollection AddProjectionWithServices<TProjection, TStore>(
this IServiceCollection services,
ProjectionLifecycle lifecycle,
ServiceLifetime lifetime = ServiceLifetime.Scoped,
Action<ProjectionBase>? configure = null)
where TProjection : class, IMartenRegistrable
where TStore : class, IDocumentStore
{
TProjection.Register<TProjection, TStore>(services, lifecycle, lifetime, configure);
return services;
}
}

public interface IGlobalConfigureMarten: IConfigureMarten;
Expand Down
Loading