Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Threading.Tasks;
using JasperFx.Events.Aggregation;
using Marten;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Bugs;

public class Bug_4197_fetch_for_writing_natural_key : OneOffConfigurationsContext
{
// Types matching the user's repro
public sealed record Bug4197AggregateKey(string Value);

public sealed record Bug4197AggregateCreatedEvent(Guid Id, string Key);

public sealed class Bug4197Aggregate
{
public Guid Id { get; set; }

[NaturalKey]
public Bug4197AggregateKey Key { get; set; }

[NaturalKeySource]
public void Apply(Bug4197AggregateCreatedEvent e)
{
Id = e.Id;
Key = new Bug4197AggregateKey(e.Key);
}
}

[Fact]
public async Task fetch_for_writing_with_natural_key_without_explicit_projection_registration()
{
// This matches the user's repro: no explicit projection registration,
// just a self-aggregating type with [NaturalKey] and [NaturalKeySource].
// Marten should auto-discover the aggregate and its natural key.
StoreOptions(opts =>
{
// No explicit projection registration - relying on auto-discovery
});

// First call FetchForWriting to trigger auto-discovery, then apply schema
await using var session1 = theStore.LightweightSession();
var preCheck = await session1.Events.FetchForWriting<Bug4197Aggregate, Bug4197AggregateKey>(
new Bug4197AggregateKey("nonexistent"));
preCheck.Aggregate.ShouldBeNull();

// Now the projection is auto-registered, apply the schema
await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

await using var session = theStore.LightweightSession();

var aggregateId = Guid.NewGuid();
var aggregateKey = new Bug4197AggregateKey("randomkeyvalue");
var e = new Bug4197AggregateCreatedEvent(aggregateId, aggregateKey.Value);

session.Events.StartStream<Bug4197Aggregate>(aggregateId, e);
await session.SaveChangesAsync();

// This should NOT throw: InvalidOperationException: Invalid identifier type for aggregate
var stream = await session.Events.FetchForWriting<Bug4197Aggregate, Bug4197AggregateKey>(aggregateKey);

stream.ShouldNotBeNull();
stream.Aggregate.ShouldNotBeNull();
stream.Aggregate.Key.ShouldBe(aggregateKey);
}

[Fact]
public async Task fetch_for_writing_with_natural_key_with_inline_snapshot()
{
// This is the working pattern from the existing tests
StoreOptions(opts =>
{
opts.Projections.Snapshot<Bug4197Aggregate>(SnapshotLifecycle.Inline);
});

await using var session = theStore.LightweightSession();

var aggregateId = Guid.NewGuid();
var aggregateKey = new Bug4197AggregateKey("randomkeyvalue");
var e = new Bug4197AggregateCreatedEvent(aggregateId, aggregateKey.Value);

session.Events.StartStream<Bug4197Aggregate>(aggregateId, e);
await session.SaveChangesAsync();

var stream = await session.Events.FetchForWriting<Bug4197Aggregate, Bug4197AggregateKey>(aggregateKey);

stream.ShouldNotBeNull();
stream.Aggregate.ShouldNotBeNull();
stream.Aggregate.Key.ShouldBe(aggregateKey);
}
}
36 changes: 36 additions & 0 deletions src/Marten/Events/EventStore.FetchForWriting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using ImTools;
Expand All @@ -11,6 +13,8 @@
using JasperFx.Events;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Projections;
using Marten.Events.Fetching;
using Marten.Events.Projections;
using Marten.Internal;
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
Expand Down Expand Up @@ -246,6 +250,10 @@ private IAggregateFetchPlan<TDoc, TId> determineFetchPlan<TDoc, TId>(StoreOption
// before attempting the cast to IEventIdentityStrategy<TId>
if (typeof(TId) != typeof(Guid) && typeof(TId) != typeof(string))
{
// Auto-discover natural key from [NaturalKey] attribute on the aggregate type
// BEFORE iterating planners, so the projection is registered and available
tryAutoRegisterNaturalKeyProjection<TDoc, TId>(options);

foreach (var planner in options.Projections.allPlanners())
{
// Pass null identity - natural key planners don't use it
Expand All @@ -269,6 +277,34 @@ private IAggregateFetchPlan<TDoc, TId> determineFetchPlan<TDoc, TId>(StoreOption
throw new InvalidOperationException(
$"Unable to determine a fetch plan for aggregate {typeof(TDoc).FullNameInCode()}. Is there a valid single stream aggregation projection for this type?");
}

/// <summary>
/// Auto-discovers a natural key from [NaturalKey] attribute on the aggregate type
/// and registers an Inline snapshot projection if no projection exists yet.
/// This enables FetchForWriting with natural keys on self-aggregating types
/// without requiring explicit projection registration.
/// </summary>
private static void tryAutoRegisterNaturalKeyProjection<TDoc, TId>(StoreOptions options)
where TDoc : class where TId : notnull
{
// Skip if a projection is already registered for this aggregate type
if (options.Projections.TryFindAggregate(typeof(TDoc), out _))
{
return;
}

var naturalKeyProp = typeof(TDoc).GetProperties(BindingFlags.Public | BindingFlags.Instance)
.FirstOrDefault(p => p.GetCustomAttribute<NaturalKeyAttribute>() != null);

if (naturalKeyProp == null || naturalKeyProp.PropertyType != typeof(TId))
{
return;
}

// Register an Inline snapshot projection so the natural key infrastructure
// (natural key table, inline projection, NaturalKeyFetchPlanner) all activate
options.Projections.Snapshot<TDoc>(SnapshotLifecycle.Inline);
}
}

public interface IAggregateFetchPlan<TDoc, in TId> where TDoc : notnull
Expand Down
Loading