Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/events/dcb.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ private void ConfigureStore()

Each tag type gets its own table (`mt_event_tag_student`, `mt_event_tag_course`, etc.) with a composite primary key of `(value, seq_id)`.

### Automatic Tag Type Registration

When you register a `SingleStreamProjection<TDoc, TId>` or `MultiStreamProjection<TDoc, TId>` that uses a strong-typed identifier as its `TId`, Marten will **automatically register that type as a tag type** with `ForAggregate()` pointing to `TDoc`. This means you don't need to call `RegisterTagType<TId>()` explicitly in most cases:

```csharp
// The projection's TId (TicketId) is auto-registered as a tag type
opts.Projections.Add<TicketSummaryProjection>(ProjectionLifecycle.Inline);

// No need for: opts.Events.RegisterTagType<TicketId>().ForAggregate<TicketSummary>();
```

Auto-discovery only applies to strong-typed identifiers (e.g., `record struct TicketId(Guid Value)`). Primitive types like `Guid`, `string`, `int`, `long`, and `short` are not auto-registered.

If you explicitly register a tag type before auto-discovery runs, your explicit registration takes precedence. This lets you customize the table suffix when needed:

```csharp
// Explicit registration with custom table suffix — auto-discovery won't overwrite this
opts.Events.RegisterTagType<TicketId>("custom_ticket")
.ForAggregate<TicketSummary>();
opts.Projections.Add<TicketSummaryProjection>(ProjectionLifecycle.Inline);
```

### Tag Type Requirements

Tag types should be simple wrapper records around a primitive value:
Expand Down
139 changes: 139 additions & 0 deletions src/EventSourcingTests/Dcb/auto_discover_tag_types_from_projections.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#nullable enable
using System;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Projections;
using JasperFx.Events.Tags;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Dcb;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Dcb;

// Strong-typed identifier for auto-discovery tests
public record struct TicketId(Guid Value);

// Domain events
public record TicketOpened(string Title);
public record TicketResolved(string Resolution);

// Aggregate that uses the strong-typed ID as its document identity
public class TicketSummary
{
public TicketId Id { get; set; }
public string Title { get; set; } = "";
public string? Resolution { get; set; }

public void Apply(TicketOpened e) => Title = e.Title;
public void Apply(TicketResolved e) => Resolution = e.Resolution;
}

// Projection with strong-typed ID — auto-discovery should detect TicketId from TId
public class TicketSummaryProjection: SingleStreamProjection<TicketSummary, TicketId>
{
}

[Collection("OneOffs")]
public class auto_discover_tag_types_from_projections: OneOffConfigurationsContext, IAsyncLifetime
{
public Task InitializeAsync() => Task.CompletedTask;
public Task DisposeAsync() => Task.CompletedTask;

[Fact]
public void tag_type_is_auto_registered_from_single_stream_projection()
{
// Register the projection but do NOT explicitly call RegisterTagType<TicketId>()
StoreOptions(opts =>
{
opts.Projections.Add<TicketSummaryProjection>(ProjectionLifecycle.Inline);
});

// The tag type should have been auto-discovered from the projection's TId
var registration = theStore.Events.FindTagType(typeof(TicketId));
registration.ShouldNotBeNull();
registration.TagType.ShouldBe(typeof(TicketId));
registration.AggregateType.ShouldBe(typeof(TicketSummary));
}

[Fact]
public void explicit_registration_takes_precedence_over_auto_discovery()
{
// Explicitly register with a custom table suffix BEFORE auto-discovery runs
StoreOptions(opts =>
{
opts.Events.RegisterTagType<TicketId>("custom_ticket")
.ForAggregate<TicketSummary>();
opts.Projections.Add<TicketSummaryProjection>(ProjectionLifecycle.Inline);
});

var registration = theStore.Events.FindTagType(typeof(TicketId));
registration.ShouldNotBeNull();
registration.TableSuffix.ShouldBe("custom_ticket");
}

[Fact]
public void primitive_identity_types_are_not_auto_registered()
{
// A projection with a Guid identity (primitive) should NOT auto-register a tag type
StoreOptions(opts =>
{
opts.Projections.LiveStreamAggregation<StudentCourseEnrollment>();
});

theStore.Events.FindTagType(typeof(Guid)).ShouldBeNull();
}

[Fact]
public async Task auto_discovered_tag_type_works_for_querying()
{
// Register projection only — no explicit RegisterTagType call
StoreOptions(opts =>
{
opts.Projections.Add<TicketSummaryProjection>(ProjectionLifecycle.Inline);
opts.Events.AddEventType<TicketOpened>();
opts.Events.AddEventType<TicketResolved>();
});

var ticketId = new TicketId(Guid.NewGuid());
var streamId = Guid.NewGuid();

var opened = theSession.Events.BuildEvent(new TicketOpened("Fix bug"));
opened.WithTag(ticketId);
theSession.Events.Append(streamId, opened);
await theSession.SaveChangesAsync();

var query = new EventTagQuery().Or<TicketId>(ticketId);
var events = await theSession.Events.QueryByTagsAsync(query);
events.Count.ShouldBe(1);
events[0].Data.ShouldBeOfType<TicketOpened>();
}

[Fact]
public async Task auto_discovered_tag_type_works_for_fetch_for_writing()
{
StoreOptions(opts =>
{
opts.Projections.Add<TicketSummaryProjection>(ProjectionLifecycle.Inline);
opts.Events.AddEventType<TicketOpened>();
opts.Events.AddEventType<TicketResolved>();
});

var ticketId = new TicketId(Guid.NewGuid());
var streamId = Guid.NewGuid();

var opened = theSession.Events.BuildEvent(new TicketOpened("Add feature"));
opened.WithTag(ticketId);
theSession.Events.Append(streamId, opened);
await theSession.SaveChangesAsync();

var query = new EventTagQuery().Or<TicketId>(ticketId);
var boundary = await theSession.Events.FetchForWritingByTags<TicketSummary>(query);

boundary.Aggregate.ShouldNotBeNull();
boundary.Aggregate.Title.ShouldBe("Add feature");
boundary.Events.Count.ShouldBe(1);
}
}
31 changes: 30 additions & 1 deletion src/Marten/Events/Dcb/FetchForWritingByTagsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Projections;
using JasperFx.Events.Tags;
using Marten.Internal;
using Marten.Internal.Sessions;
using Marten.Linq.QueryHandlers;
using NpgsqlTypes;
using Weasel.Postgresql;

namespace Marten.Events.Dcb;
Expand Down Expand Up @@ -90,7 +92,34 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
builder.Append(")");
}

builder.Append(") order by e.seq_id");
builder.Append(")");

// If the aggregator has event type filtering, apply it to limit the returned events
var eventTypeNames = resolveAggregatorEventTypeNames();
if (eventTypeNames != null)
{
builder.Append(" and e.type = ANY(");
var parameter = builder.AppendParameter(eventTypeNames);
parameter.NpgsqlDbType = NpgsqlDbType.Varchar | NpgsqlDbType.Array;
builder.Append(")");
}

builder.Append(" order by e.seq_id");
}

private string[]? resolveAggregatorEventTypeNames()
{
var aggregator = _store.Options.Projections.AggregatorFor<T>();
if (aggregator is not EventFilterable filterable) return null;

var includedTypes = filterable.IncludedEventTypes;
if (includedTypes.Count == 0 || includedTypes.Any(x => x.IsAbstract || x.IsInterface)) return null;

var additionalAliases = _store.Events.AliasesForEvents(includedTypes);
return includedTypes
.Select(x => _store.Events.EventMappingFor(x).Alias)
.Union(additionalAliases)
.ToArray();
}

public IEventBoundary<T> Handle(DbDataReader reader, IMartenSession session)
Expand Down
32 changes: 32 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,38 @@ internal void Initialize(DocumentStore store)
{
mapping.JsonTransformation(null);
}

autoDiscoverTagTypesFromProjections();
}

private static readonly HashSet<Type> PrimitiveIdentityTypes =
[
typeof(Guid), typeof(string), typeof(int), typeof(long), typeof(short)
];

private static readonly System.Reflection.MethodInfo CreateTagTypeMethod =
typeof(TagTypeRegistration).GetMethod(nameof(TagTypeRegistration.Create))!;

private void autoDiscoverTagTypesFromProjections()
{
foreach (var projection in Options.Projections.All.OfType<IAggregateProjection>())
{
var identityType = projection.IdentityType;
if (identityType == null || PrimitiveIdentityTypes.Contains(identityType)) continue;
if (_tagTypes.Any(t => t.TagType == identityType)) continue;

try
{
var generic = CreateTagTypeMethod.MakeGenericMethod(identityType);
var registration = (ITagTypeRegistration)generic.Invoke(null, [null])!;
registration.ForAggregate(projection.AggregateType);
_tagTypes.Add(registration);
}
catch
{
// Not a valid strong-typed identifier — skip silently
}
}
}

IReadOnlyList<ICodeFile> ICodeFileCollection.BuildFiles()
Expand Down
43 changes: 39 additions & 4 deletions src/Marten/Events/EventStore.Dcb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Projections;
using JasperFx.Events.Tags;
using Marten.Events.Dcb;
using Marten.Internal.Sessions;
Expand All @@ -26,11 +27,17 @@ public async Task<bool> EventsExistAsync(EventTagQuery query, CancellationToken

public async Task<IReadOnlyList<IEvent>> QueryByTagsAsync(EventTagQuery query,
CancellationToken cancellation = default)
{
return await QueryByTagsAsync(query, null, cancellation).ConfigureAwait(false);
}

private async Task<IReadOnlyList<IEvent>> QueryByTagsAsync(EventTagQuery query,
string[]? aggregatorEventTypeNames, CancellationToken cancellation)
{
await _session.Database.EnsureStorageExistsAsync(typeof(IEvent), cancellation).ConfigureAwait(false);

var storage = (EventDocumentStorage)_session.EventStorage();
var (sql, paramValues) = BuildTagQuerySql(query, storage.SelectFields());
var (sql, paramValues) = BuildTagQuerySql(query, storage.SelectFields(), aggregatorEventTypeNames);
var cmd = new NpgsqlCommand(sql);
for (var i = 0; i < paramValues.Count; i++)
{
Expand All @@ -53,7 +60,8 @@ public async Task<IReadOnlyList<IEvent>> QueryByTagsAsync(EventTagQuery query,
public async Task<IEventBoundary<T>> FetchForWritingByTags<T>(EventTagQuery query,
CancellationToken cancellation = default) where T : class
{
var events = await QueryByTagsAsync(query, cancellation).ConfigureAwait(false);
var eventTypeNames = ResolveAggregatorEventTypeNames<T>();
var events = await QueryByTagsAsync(query, eventTypeNames, cancellation).ConfigureAwait(false);
var lastSeenSequence = events.Count > 0 ? events.Max(e => e.Sequence) : 0;

T? aggregate = default;
Expand Down Expand Up @@ -98,7 +106,8 @@ private static async Task<IReadOnlyList<IEvent>> ReadEventsFromReaderAsync(DbDat
return events;
}

private (string sql, List<object> parameters) BuildTagQuerySql(EventTagQuery query, string[] selectFields)
private (string sql, List<object> parameters) BuildTagQuerySql(EventTagQuery query, string[] selectFields,
string[]? aggregatorEventTypeNames = null)
{
var conditions = query.Conditions;
if (conditions.Count == 0)
Expand Down Expand Up @@ -174,8 +183,34 @@ private static async Task<IReadOnlyList<IEvent>> ReadEventsFromReaderAsync(DbDat
sb.Append(')');
}

sb.Append(") order by e.seq_id");
sb.Append(')');

// If the aggregator has event type filtering, apply it to limit the returned events
if (aggregatorEventTypeNames is { Length: > 0 })
{
sb.Append(" and e.type = ANY(@p");
sb.Append(paramValues.Count);
sb.Append(')');
paramValues.Add(aggregatorEventTypeNames);
}

sb.Append(" order by e.seq_id");

return (sb.ToString(), paramValues);
}

private string[]? ResolveAggregatorEventTypeNames<T>() where T : class
{
var aggregator = _store.Options.Projections.AggregatorFor<T>();
if (aggregator is not EventFilterable filterable) return null;

var includedTypes = filterable.IncludedEventTypes;
if (includedTypes.Count == 0 || includedTypes.Any(x => x.IsAbstract || x.IsInterface)) return null;

var additionalAliases = _store.Events.AliasesForEvents(includedTypes);
return includedTypes
.Select(x => _store.Events.EventMappingFor(x).Alias)
.Union(additionalAliases)
.ToArray();
}
}
Loading