diff --git a/docs/events/dcb.md b/docs/events/dcb.md index 11f35b6ebb..e652237b82 100644 --- a/docs/events/dcb.md +++ b/docs/events/dcb.md @@ -42,6 +42,28 @@ private void ConfigureStore() Each tag type gets its own table (`mt_event_tag_student`, `mt_event_tag_course`, etc.) with a composite primary key of `(value, seq_id)`. +### Automatic Tag Type Registration + +When you register a `SingleStreamProjection` or `MultiStreamProjection` that uses a strong-typed identifier as its `TId`, Marten will **automatically register that type as a tag type** with `ForAggregate()` pointing to `TDoc`. This means you don't need to call `RegisterTagType()` explicitly in most cases: + +```csharp +// The projection's TId (TicketId) is auto-registered as a tag type +opts.Projections.Add(ProjectionLifecycle.Inline); + +// No need for: opts.Events.RegisterTagType().ForAggregate(); +``` + +Auto-discovery only applies to strong-typed identifiers (e.g., `record struct TicketId(Guid Value)`). Primitive types like `Guid`, `string`, `int`, `long`, and `short` are not auto-registered. + +If you explicitly register a tag type before auto-discovery runs, your explicit registration takes precedence. This lets you customize the table suffix when needed: + +```csharp +// Explicit registration with custom table suffix — auto-discovery won't overwrite this +opts.Events.RegisterTagType("custom_ticket") + .ForAggregate(); +opts.Projections.Add(ProjectionLifecycle.Inline); +``` + ### Tag Type Requirements Tag types should be simple wrapper records around a primitive value: diff --git a/src/EventSourcingTests/Dcb/auto_discover_tag_types_from_projections.cs b/src/EventSourcingTests/Dcb/auto_discover_tag_types_from_projections.cs new file mode 100644 index 0000000000..8fa6934c99 --- /dev/null +++ b/src/EventSourcingTests/Dcb/auto_discover_tag_types_from_projections.cs @@ -0,0 +1,139 @@ +#nullable enable +using System; +using System.Threading.Tasks; +using JasperFx.Events; +using JasperFx.Events.Projections; +using JasperFx.Events.Tags; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Dcb; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Dcb; + +// Strong-typed identifier for auto-discovery tests +public record struct TicketId(Guid Value); + +// Domain events +public record TicketOpened(string Title); +public record TicketResolved(string Resolution); + +// Aggregate that uses the strong-typed ID as its document identity +public class TicketSummary +{ + public TicketId Id { get; set; } + public string Title { get; set; } = ""; + public string? Resolution { get; set; } + + public void Apply(TicketOpened e) => Title = e.Title; + public void Apply(TicketResolved e) => Resolution = e.Resolution; +} + +// Projection with strong-typed ID — auto-discovery should detect TicketId from TId +public class TicketSummaryProjection: SingleStreamProjection +{ +} + +[Collection("OneOffs")] +public class auto_discover_tag_types_from_projections: OneOffConfigurationsContext, IAsyncLifetime +{ + public Task InitializeAsync() => Task.CompletedTask; + public Task DisposeAsync() => Task.CompletedTask; + + [Fact] + public void tag_type_is_auto_registered_from_single_stream_projection() + { + // Register the projection but do NOT explicitly call RegisterTagType() + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + // The tag type should have been auto-discovered from the projection's TId + var registration = theStore.Events.FindTagType(typeof(TicketId)); + registration.ShouldNotBeNull(); + registration.TagType.ShouldBe(typeof(TicketId)); + registration.AggregateType.ShouldBe(typeof(TicketSummary)); + } + + [Fact] + public void explicit_registration_takes_precedence_over_auto_discovery() + { + // Explicitly register with a custom table suffix BEFORE auto-discovery runs + StoreOptions(opts => + { + opts.Events.RegisterTagType("custom_ticket") + .ForAggregate(); + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + var registration = theStore.Events.FindTagType(typeof(TicketId)); + registration.ShouldNotBeNull(); + registration.TableSuffix.ShouldBe("custom_ticket"); + } + + [Fact] + public void primitive_identity_types_are_not_auto_registered() + { + // A projection with a Guid identity (primitive) should NOT auto-register a tag type + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + }); + + theStore.Events.FindTagType(typeof(Guid)).ShouldBeNull(); + } + + [Fact] + public async Task auto_discovered_tag_type_works_for_querying() + { + // Register projection only — no explicit RegisterTagType call + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.Events.AddEventType(); + opts.Events.AddEventType(); + }); + + var ticketId = new TicketId(Guid.NewGuid()); + var streamId = Guid.NewGuid(); + + var opened = theSession.Events.BuildEvent(new TicketOpened("Fix bug")); + opened.WithTag(ticketId); + theSession.Events.Append(streamId, opened); + await theSession.SaveChangesAsync(); + + var query = new EventTagQuery().Or(ticketId); + var events = await theSession.Events.QueryByTagsAsync(query); + events.Count.ShouldBe(1); + events[0].Data.ShouldBeOfType(); + } + + [Fact] + public async Task auto_discovered_tag_type_works_for_fetch_for_writing() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.Events.AddEventType(); + opts.Events.AddEventType(); + }); + + var ticketId = new TicketId(Guid.NewGuid()); + var streamId = Guid.NewGuid(); + + var opened = theSession.Events.BuildEvent(new TicketOpened("Add feature")); + opened.WithTag(ticketId); + theSession.Events.Append(streamId, opened); + await theSession.SaveChangesAsync(); + + var query = new EventTagQuery().Or(ticketId); + var boundary = await theSession.Events.FetchForWritingByTags(query); + + boundary.Aggregate.ShouldNotBeNull(); + boundary.Aggregate.Title.ShouldBe("Add feature"); + boundary.Events.Count.ShouldBe(1); + } +} diff --git a/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs b/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs index f4cca3a478..063cf6c28e 100644 --- a/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs +++ b/src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs @@ -7,10 +7,12 @@ using System.Threading; using System.Threading.Tasks; using JasperFx.Events; +using JasperFx.Events.Projections; using JasperFx.Events.Tags; using Marten.Internal; using Marten.Internal.Sessions; using Marten.Linq.QueryHandlers; +using NpgsqlTypes; using Weasel.Postgresql; namespace Marten.Events.Dcb; @@ -90,7 +92,34 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.Append(")"); } - builder.Append(") order by e.seq_id"); + builder.Append(")"); + + // If the aggregator has event type filtering, apply it to limit the returned events + var eventTypeNames = resolveAggregatorEventTypeNames(); + if (eventTypeNames != null) + { + builder.Append(" and e.type = ANY("); + var parameter = builder.AppendParameter(eventTypeNames); + parameter.NpgsqlDbType = NpgsqlDbType.Varchar | NpgsqlDbType.Array; + builder.Append(")"); + } + + builder.Append(" order by e.seq_id"); + } + + private string[]? resolveAggregatorEventTypeNames() + { + var aggregator = _store.Options.Projections.AggregatorFor(); + if (aggregator is not EventFilterable filterable) return null; + + var includedTypes = filterable.IncludedEventTypes; + if (includedTypes.Count == 0 || includedTypes.Any(x => x.IsAbstract || x.IsInterface)) return null; + + var additionalAliases = _store.Events.AliasesForEvents(includedTypes); + return includedTypes + .Select(x => _store.Events.EventMappingFor(x).Alias) + .Union(additionalAliases) + .ToArray(); } public IEventBoundary Handle(DbDataReader reader, IMartenSession session) diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 1969bc8883..0b6d3745e7 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -570,6 +570,38 @@ internal void Initialize(DocumentStore store) { mapping.JsonTransformation(null); } + + autoDiscoverTagTypesFromProjections(); + } + + private static readonly HashSet PrimitiveIdentityTypes = + [ + typeof(Guid), typeof(string), typeof(int), typeof(long), typeof(short) + ]; + + private static readonly System.Reflection.MethodInfo CreateTagTypeMethod = + typeof(TagTypeRegistration).GetMethod(nameof(TagTypeRegistration.Create))!; + + private void autoDiscoverTagTypesFromProjections() + { + foreach (var projection in Options.Projections.All.OfType()) + { + var identityType = projection.IdentityType; + if (identityType == null || PrimitiveIdentityTypes.Contains(identityType)) continue; + if (_tagTypes.Any(t => t.TagType == identityType)) continue; + + try + { + var generic = CreateTagTypeMethod.MakeGenericMethod(identityType); + var registration = (ITagTypeRegistration)generic.Invoke(null, [null])!; + registration.ForAggregate(projection.AggregateType); + _tagTypes.Add(registration); + } + catch + { + // Not a valid strong-typed identifier — skip silently + } + } } IReadOnlyList ICodeFileCollection.BuildFiles() diff --git a/src/Marten/Events/EventStore.Dcb.cs b/src/Marten/Events/EventStore.Dcb.cs index 103b44b682..089f6d1693 100644 --- a/src/Marten/Events/EventStore.Dcb.cs +++ b/src/Marten/Events/EventStore.Dcb.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using JasperFx.Events; +using JasperFx.Events.Projections; using JasperFx.Events.Tags; using Marten.Events.Dcb; using Marten.Internal.Sessions; @@ -26,11 +27,17 @@ public async Task EventsExistAsync(EventTagQuery query, CancellationToken public async Task> QueryByTagsAsync(EventTagQuery query, CancellationToken cancellation = default) + { + return await QueryByTagsAsync(query, null, cancellation).ConfigureAwait(false); + } + + private async Task> QueryByTagsAsync(EventTagQuery query, + string[]? aggregatorEventTypeNames, CancellationToken cancellation) { await _session.Database.EnsureStorageExistsAsync(typeof(IEvent), cancellation).ConfigureAwait(false); var storage = (EventDocumentStorage)_session.EventStorage(); - var (sql, paramValues) = BuildTagQuerySql(query, storage.SelectFields()); + var (sql, paramValues) = BuildTagQuerySql(query, storage.SelectFields(), aggregatorEventTypeNames); var cmd = new NpgsqlCommand(sql); for (var i = 0; i < paramValues.Count; i++) { @@ -53,7 +60,8 @@ public async Task> QueryByTagsAsync(EventTagQuery query, public async Task> FetchForWritingByTags(EventTagQuery query, CancellationToken cancellation = default) where T : class { - var events = await QueryByTagsAsync(query, cancellation).ConfigureAwait(false); + var eventTypeNames = ResolveAggregatorEventTypeNames(); + var events = await QueryByTagsAsync(query, eventTypeNames, cancellation).ConfigureAwait(false); var lastSeenSequence = events.Count > 0 ? events.Max(e => e.Sequence) : 0; T? aggregate = default; @@ -98,7 +106,8 @@ private static async Task> ReadEventsFromReaderAsync(DbDat return events; } - private (string sql, List parameters) BuildTagQuerySql(EventTagQuery query, string[] selectFields) + private (string sql, List parameters) BuildTagQuerySql(EventTagQuery query, string[] selectFields, + string[]? aggregatorEventTypeNames = null) { var conditions = query.Conditions; if (conditions.Count == 0) @@ -174,8 +183,34 @@ private static async Task> ReadEventsFromReaderAsync(DbDat sb.Append(')'); } - sb.Append(") order by e.seq_id"); + sb.Append(')'); + + // If the aggregator has event type filtering, apply it to limit the returned events + if (aggregatorEventTypeNames is { Length: > 0 }) + { + sb.Append(" and e.type = ANY(@p"); + sb.Append(paramValues.Count); + sb.Append(')'); + paramValues.Add(aggregatorEventTypeNames); + } + + sb.Append(" order by e.seq_id"); return (sb.ToString(), paramValues); } + + private string[]? ResolveAggregatorEventTypeNames() where T : class + { + var aggregator = _store.Options.Projections.AggregatorFor(); + if (aggregator is not EventFilterable filterable) return null; + + var includedTypes = filterable.IncludedEventTypes; + if (includedTypes.Count == 0 || includedTypes.Any(x => x.IsAbstract || x.IsInterface)) return null; + + var additionalAliases = _store.Events.AliasesForEvents(includedTypes); + return includedTypes + .Select(x => _store.Events.EventMappingFor(x).Alias) + .Union(additionalAliases) + .ToArray(); + } }