Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System;
using System.Threading.Tasks;
using JasperFx.Events;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Aggregation;

public class live_aggregation_without_an_aggregate_identifier : OneOffConfigurationsContext
{
[Fact]
public async Task live_aggregation_with_guid_identifiers()
{
StoreOptions(opts =>
{
opts.Events.StreamIdentity = StreamIdentity.AsGuid;
});

var streamId = Guid.NewGuid();
theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent());
await theSession.SaveChangesAsync();

var version1 = await theSession.Events.AggregateStreamAsync<CountOfLetters>(streamId);

var version2 = await theSession.Events.FetchLatest<CountOfLetters>(streamId);

var version3 = await theSession.Events.FetchForWriting<CountOfLetters>(streamId);

var version4 = await theSession.Events.AggregateStreamToLastKnownAsync<CountOfLetters>(streamId);

version1.ACount.ShouldBe(1);
version1.BCount.ShouldBe(2);
version1.CCount.ShouldBe(1);
version1.DCount.ShouldBe(0);

version2.ACount.ShouldBe(1);
version2.BCount.ShouldBe(2);
version2.CCount.ShouldBe(1);
version2.DCount.ShouldBe(0);

version3.Aggregate.ACount.ShouldBe(1);
version3.Aggregate.BCount.ShouldBe(2);
version3.Aggregate.CCount.ShouldBe(1);
version3.Aggregate.DCount.ShouldBe(0);

version4.ACount.ShouldBe(1);
version4.BCount.ShouldBe(2);
version4.CCount.ShouldBe(1);
version4.DCount.ShouldBe(0);
}

[Fact]
public async Task live_aggregation_with_string_identifiers()
{
StoreOptions(opts =>
{
opts.Events.StreamIdentity = StreamIdentity.AsString;
});

var streamId = Guid.NewGuid().ToString();
theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent());
await theSession.SaveChangesAsync();

var version1 = await theSession.Events.AggregateStreamAsync<CountOfLetters>(streamId);

var version2 = await theSession.Events.FetchLatest<CountOfLetters>(streamId);

var version3 = await theSession.Events.FetchForWriting<CountOfLetters>(streamId);

var version4 = await theSession.Events.AggregateStreamToLastKnownAsync<CountOfLetters>(streamId);

version1.ACount.ShouldBe(1);
version1.BCount.ShouldBe(2);
version1.CCount.ShouldBe(1);
version1.DCount.ShouldBe(0);

version2.ACount.ShouldBe(1);
version2.BCount.ShouldBe(2);
version2.CCount.ShouldBe(1);
version2.DCount.ShouldBe(0);

version3.Aggregate.ACount.ShouldBe(1);
version3.Aggregate.BCount.ShouldBe(2);
version3.Aggregate.CCount.ShouldBe(1);
version3.Aggregate.DCount.ShouldBe(0);

version4.ACount.ShouldBe(1);
version4.BCount.ShouldBe(2);
version4.CCount.ShouldBe(1);
version4.DCount.ShouldBe(0);
}
}


public class CountOfLetters
{
public int ACount { get; set; }
public int BCount { get; set; }
public int CCount { get; set; }
public int DCount { get; set; }
public int ECount { get; set; }

public void Apply(AEvent _)
{
ACount++;
}

public void Apply(BEvent _)
{
BCount++;
}

public void Apply(CEvent _)
{
CCount++;
}

public void Apply(DEvent _)
{
DCount++;
}

public void Apply(EEvent _)
{
ECount++;
}

}

15 changes: 15 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ internal EventGraph(StoreOptions options)
public IAggregatorSource<IQuerySession>? Build<TDoc>()
{
var idType = Options.Storage.MappingFor(typeof(TDoc)).IdType;

// For the quite legitimate case of doing a live aggregation when
// there is no Id member
if (idType == null)
{
if (StreamIdentity == StreamIdentity.AsGuid)
{
idType = typeof(Guid);
}
else
{
idType = typeof(string);
}
}

return typeof(SingleStreamProjection<,>)
.CloseAndBuildAs<IAggregatorSource<IQuerySession>>(typeof(TDoc), idType);
}
Expand Down
15 changes: 8 additions & 7 deletions src/Marten/Events/EventStore.FetchForWriting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Events;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Projections;
using Marten.Internal;
using Marten.Internal.Sessions;
Expand Down Expand Up @@ -195,24 +196,24 @@ internal IAggregateFetchPlan<TDoc, TId> FindFetchPlan<TDoc, TId>() where TDoc :
return (IAggregateFetchPlan<TDoc, TId>)stored;
}

var storage = _store.Options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly);

var plan = determineFetchPlan(storage, _session.Options);
var plan = determineFetchPlan<TDoc, TId>(_session.Options);

_fetchStrategies = _fetchStrategies.AddOrUpdate(typeof(TDoc), plan);

return plan;
}

private IAggregateFetchPlan<TDoc, TId> determineFetchPlan<TDoc, TId>(IDocumentStorage<TDoc, TId> storage,
StoreOptions options) where TDoc : class where TId : notnull
private IAggregateFetchPlan<TDoc, TId> determineFetchPlan<TDoc, TId>(StoreOptions options) where TDoc : class where TId : notnull
{
foreach (var planner in options.Projections.allPlanners())
{
if (planner.TryMatch(storage, (IEventIdentityStrategy<TId>)this, options, out var plan)) return plan;
if (planner.TryMatch<TDoc, TId>((IEventIdentityStrategy<TId>)this, options, out var plan))
{
return plan;
}
}

throw new ArgumentOutOfRangeException(nameof(storage),
throw new InvalidOperationException(
$"Unable to determine a fetch plan for aggregate {typeof(TDoc).FullNameInCode()}. Is there a valid single stream aggregation projection for this type?");
}
}
Expand Down
13 changes: 6 additions & 7 deletions src/Marten/Events/Fetching/AsyncFetchPlanner.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Diagnostics.CodeAnalysis;
using JasperFx.Core.Reflection;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using Marten.Events.Projections;
Expand All @@ -11,8 +12,9 @@ namespace Marten.Events.Fetching;

internal class AsyncFetchPlanner: IFetchPlanner
{
public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdentityStrategy<TId> identity, StoreOptions options,
[NotNullWhen(true)]out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
public bool TryMatch<TDoc, TId>(IEventIdentityStrategy<TId> identity,
StoreOptions options,
[NotNullWhen(true)] out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
{
if (options.Projections.TryFindAggregate(typeof(TDoc), out var projection))
{
Expand All @@ -22,8 +24,6 @@ public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdent
$"The aggregate type {typeof(TDoc).FullNameInCode()} is the subject of a multi-stream projection and cannot be used with FetchForWriting");
}



if (projection.Scope == AggregationScope.MultiStream)
{
throw new InvalidOperationException(
Expand All @@ -32,10 +32,9 @@ public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdent

if (projection.Lifecycle == ProjectionLifecycle.Async)
{
var mapping = options.Storage.FindMapping(typeof(TDoc)) as DocumentMapping;
if (mapping != null && mapping.Metadata.Revision.Enabled)
if (options.Storage.FindMapping(typeof(TDoc)) is DocumentMapping { Metadata.Revision.Enabled: true })
{
plan = new FetchAsyncPlan<TDoc, TId>(options.EventGraph, identity, storage);
plan = new FetchAsyncPlan<TDoc, TId>(options.EventGraph, identity, options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly));
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Marten.Exceptions;
using Marten.Internal;
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
using Marten.Linq.QueryHandlers;
using Npgsql;
using Weasel.Postgresql;
Expand All @@ -29,7 +30,7 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio

builder.StartNewCommand();

var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
var handler = new LoadByIdHandler<TDoc, TId>((IDocumentStorage<TDoc, TId>)storage, id);
handler.ConfigureCommand(builder, session);

await using var reader =
Expand Down Expand Up @@ -91,7 +92,7 @@ public IQueryHandler<IEventStream<TDoc>> BuildQueryHandler(QuerySession session,
{
session.AssertIsDocumentSession();
var storage = findDocumentStorage(session);
var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
var handler = new LoadByIdHandler<TDoc, TId>((IDocumentStorage<TDoc, TId>)storage, id);
return new WithStartingVersionHandler(this, id, handler, expectedStartingVersion);
}

Expand Down
3 changes: 2 additions & 1 deletion src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Threading;
using System.Threading.Tasks;
using Marten.Internal.Sessions;
using Marten.Internal.Storage;
using Marten.Linq.QueryHandlers;
using Weasel.Postgresql;

Expand All @@ -23,7 +24,7 @@ internal partial class FetchInlinedPlan<TDoc, TId>
var builder = new BatchBuilder { TenantId = session.TenantId };
builder.Append(";");

var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
var handler = new LoadByIdHandler<TDoc, TId>((IDocumentStorage<TDoc, TId>)storage, id);
handler.ConfigureCommand(builder, session);

await using var reader =
Expand Down
1 change: 1 addition & 0 deletions src/Marten/Events/Fetching/FetchInlinedPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Core.Reflection;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Projections;
using Marten.Exceptions;
using Marten.Internal;
Expand Down
21 changes: 17 additions & 4 deletions src/Marten/Events/Fetching/FetchLivePlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using JasperFx;
using JasperFx.Core.Reflection;
using JasperFx.Events;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Projections;
using Marten.Exceptions;
Expand All @@ -17,11 +18,11 @@ namespace Marten.Events.Fetching;
internal partial class FetchLivePlan<TDoc, TId>: IAggregateFetchPlan<TDoc, TId> where TDoc : class where TId : notnull
{
private readonly IAggregator<TDoc, TId, IQuerySession> _aggregator;
private readonly IDocumentStorage<TDoc, TId> _documentStorage;
private readonly IIdentitySetter<TDoc, TId> _documentStorage;
private readonly IEventIdentityStrategy<TId> _identityStrategy;

public FetchLivePlan(EventGraph events, IEventIdentityStrategy<TId> identityStrategy,
IDocumentStorage<TDoc, TId> documentStorage)
IIdentitySetter<TDoc, TId> documentStorage)
{
IsGlobal = events.GlobalAggregates.Contains(typeof(TDoc));

Expand All @@ -30,8 +31,20 @@ public FetchLivePlan(EventGraph events, IEventIdentityStrategy<TId> identityStra

var raw = events.Options.Projections.AggregatorFor<TDoc>();

_aggregator = raw as IAggregator<TDoc, TId, IQuerySession>
?? typeof(IdentityForwardingAggregator<,,,>).CloseAndBuildAs<IAggregator<TDoc, TId, IQuerySession>>(raw, _documentStorage, typeof(TDoc), _documentStorage.IdType, typeof(TId), typeof(IQuerySession));
// yeah, I know, this is kind of gross
if (documentStorage is NulloIdentitySetter<TDoc, TId>)
{
_aggregator = (IAggregator<TDoc, TId, IQuerySession>?)raw;
}
else
{
var simpleType = events.StreamIdentity == StreamIdentity.AsGuid ? typeof(Guid) : typeof(string);
var idType = documentStorage is IDocumentStorage<TDoc, TId> s ? s.IdType : typeof(TId);

// The goofy identity forwarding thing is to deal with custom value types. Of course.
_aggregator = raw as IAggregator<TDoc, TId, IQuerySession>
?? typeof(IdentityForwardingAggregator<,,,>).CloseAndBuildAs<IAggregator<TDoc, TId, IQuerySession>>(raw, _documentStorage, typeof(TDoc), idType, simpleType, typeof(IQuerySession));
}
}

public bool IsGlobal { get; }
Expand Down
5 changes: 3 additions & 2 deletions src/Marten/Events/Fetching/IFetchPlanner.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System.Diagnostics.CodeAnalysis;
using JasperFx.Events.Aggregation;
using Marten.Internal.Storage;

namespace Marten.Events.Fetching;

public interface IFetchPlanner
{
bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdentityStrategy<TId> identity,
StoreOptions options, [NotNullWhen(true)]out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull;
bool TryMatch<TDoc, TId>(IEventIdentityStrategy<TId> identity,
StoreOptions options, [NotNullWhen(true)] out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull;
}
6 changes: 4 additions & 2 deletions src/Marten/Events/Fetching/InlineFetchPlanner.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
using System.Diagnostics.CodeAnalysis;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Projections;
using Marten.Internal.Storage;

namespace Marten.Events.Fetching;

internal class InlineFetchPlanner : IFetchPlanner
{
public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdentityStrategy<TId> identity, StoreOptions options,
[NotNullWhen(true)]out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
public bool TryMatch<TDoc, TId>(IEventIdentityStrategy<TId> identity,
StoreOptions options,
[NotNullWhen(true)] out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
{
if (options.Projections.TryFindAggregate(typeof(TDoc), out var projection))
{
Expand Down
21 changes: 18 additions & 3 deletions src/Marten/Events/Fetching/LiveFetchPlanner.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
using System.Diagnostics.CodeAnalysis;
using JasperFx.Events.Aggregation;
using Marten.Exceptions;
using Marten.Internal.Storage;

namespace Marten.Events.Fetching;

internal class LiveFetchPlanner: IFetchPlanner
{
public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdentityStrategy<TId> identity,
StoreOptions options, [NotNullWhen(true)]out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
public bool TryMatch<TDoc, TId>(IEventIdentityStrategy<TId> identity,
StoreOptions options, [NotNullWhen(true)] out IAggregateFetchPlan<TDoc, TId>? plan) where TDoc : class where TId : notnull
{
plan = new FetchLivePlan<TDoc, TId>(options.EventGraph, identity, storage);
IIdentitySetter<TDoc, TId> identitySetter = new NulloIdentitySetter<TDoc, TId>();

// Yeah, this is smelly, but at least it would only happen *once* at runtime
try
{
// Try to overwrite w/ a real document storage
identitySetter = options.ResolveCorrectedDocumentStorage<TDoc, TId>(DocumentTracking.IdentityOnly);
}
catch (InvalidDocumentException)
{
// there's no identity, just use the nullo strategy
}

plan = new FetchLivePlan<TDoc, TId>(options.EventGraph, identity, identitySetter);
return true;
}
}
Loading
Loading