diff --git a/src/Marten.Testing/Events/Projections/AggregatorTests.cs b/src/Marten.Testing/Events/Projections/AggregatorTests.cs index e6063f87cd..aa93ea2e7e 100644 --- a/src/Marten.Testing/Events/Projections/AggregatorTests.cs +++ b/src/Marten.Testing/Events/Projections/AggregatorTests.cs @@ -69,7 +69,7 @@ public void build_a_series_of_events() .Add(new MembersJoined {Members = new string[] {"Gandalf", "Boromir", "Gimli", "Legolas"}}) .Add(new MembersDeparted() {Members = new string[] {"Frodo", "Sam"}}); - var party = theAggregator.Build(stream.Events); + var party = theAggregator.Build(stream.Events, null); party.Name.ShouldBe("Destroy the Ring"); diff --git a/src/Marten/Events/AggregationQueryHandler.cs b/src/Marten/Events/AggregationQueryHandler.cs index 4165537194..ec5eb680b0 100644 --- a/src/Marten/Events/AggregationQueryHandler.cs +++ b/src/Marten/Events/AggregationQueryHandler.cs @@ -11,13 +11,15 @@ namespace Marten.Events { internal class AggregationQueryHandler : IQueryHandler where T : class, new() { - private readonly Aggregator _aggregator; + private readonly IAggregator _aggregator; private readonly EventQueryHandler _inner; + private readonly IDocumentSession _session; - public AggregationQueryHandler(Aggregator aggregator, EventQueryHandler inner) + public AggregationQueryHandler(IAggregator aggregator, EventQueryHandler inner, IDocumentSession session = null) { _aggregator = aggregator; _inner = inner; + _session = session; } public void ConfigureCommand(NpgsqlCommand command) @@ -31,14 +33,14 @@ public T Handle(DbDataReader reader, IIdentityMap map) { var @events = _inner.Handle(reader, map); - return _aggregator.Build(@events); + return _aggregator.Build(@events, _session); } public async Task HandleAsync(DbDataReader reader, IIdentityMap map, CancellationToken token) { var @events = await _inner.HandleAsync(reader, map, token).ConfigureAwait(false); - return _aggregator.Build(@events); + return _aggregator.Build(@events, _session); } } } \ No newline at end of file diff --git a/src/Marten/Events/Event.cs b/src/Marten/Events/Event.cs index c174ad140f..6696b6a921 100644 --- a/src/Marten/Events/Event.cs +++ b/src/Marten/Events/Event.cs @@ -9,7 +9,7 @@ public interface IEvent int Version { get; set; } object Data { get; } - void Apply(TAggregate state, Aggregator aggregator) + void Apply(TAggregate state, IAggregator aggregator) where TAggregate : class, new(); } @@ -26,7 +26,7 @@ public Event(T data) object IEvent.Data => Data; - public virtual void Apply(TAggregate state, Aggregator aggregator) + public virtual void Apply(TAggregate state, IAggregator aggregator) where TAggregate : class, new() { aggregator.AggregatorFor()?.Apply(state, Data); diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 1d973e919a..c35160c145 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -208,11 +208,16 @@ IncludeJoin IDocumentMapping.JoinToInclude(JoinType joinType, ID throw new NotSupportedException(); } - public Aggregator AggregateFor() where T : class, new() + public void AggregateFor(IAggregator aggregator) where T : class, new() + { + _aggregates.AddOrUpdate(typeof (T), aggregator, (type, previous) => aggregator); + } + + public IAggregator AggregateFor() where T : class, new() { return _aggregates .GetOrAdd(typeof (T), type => new Aggregator()) - .As>(); + .As>(); } @@ -224,7 +229,7 @@ public Type AggregateTypeFor(string aggregateTypeName) }).AggregateType; } - public Aggregator AggregateStreamsInlineWith() where T : class, new() + public IAggregator AggregateStreamsInlineWith() where T : class, new() { var aggregator = AggregateFor(); var finder = new AggregateFinder(); diff --git a/src/Marten/Events/EventStore.cs b/src/Marten/Events/EventStore.cs index 98629a9d09..e591bba874 100644 --- a/src/Marten/Events/EventStore.cs +++ b/src/Marten/Events/EventStore.cs @@ -83,7 +83,7 @@ public Task> FetchStreamAsync(Guid streamId, int version = 0, Date { var inner = new EventQueryHandler(_selector, streamId, version, timestamp); var aggregator = _schema.Events.AggregateFor(); - var handler = new AggregationQueryHandler(aggregator, inner); + var handler = new AggregationQueryHandler(aggregator, inner, _session); return _connection.Fetch(handler, null); } @@ -93,7 +93,7 @@ public Task AggregateStreamAsync(Guid streamId, int version = 0, DateTime? { var inner = new EventQueryHandler(_selector, streamId, version, timestamp); var aggregator = _schema.Events.AggregateFor(); - var handler = new AggregationQueryHandler(aggregator, inner); + var handler = new AggregationQueryHandler(aggregator, inner, _session); return _connection.FetchAsync(handler, null, token); } diff --git a/src/Marten/Events/IEventStoreConfiguration.cs b/src/Marten/Events/IEventStoreConfiguration.cs index cf0c0122ad..b323628af0 100644 --- a/src/Marten/Events/IEventStoreConfiguration.cs +++ b/src/Marten/Events/IEventStoreConfiguration.cs @@ -17,9 +17,11 @@ public interface IEventStoreConfiguration IEnumerable AllAggregates(); EventMapping EventMappingFor(string eventType); bool IsActive { get; } - Aggregator AggregateFor() where T : class, new(); + + void AggregateFor(IAggregator aggregator) where T : class, new(); + IAggregator AggregateFor() where T : class, new(); Type AggregateTypeFor(string aggregateTypeName); - Aggregator AggregateStreamsInlineWith() where T : class, new(); + IAggregator AggregateStreamsInlineWith() where T : class, new(); void TransformEventsInlineWith(ITransform transform); void InlineTransformation(IProjection projection); diff --git a/src/Marten/Events/Projections/AggregationProjection.cs b/src/Marten/Events/Projections/AggregationProjection.cs index 92c78c90eb..ae9c2afcc0 100644 --- a/src/Marten/Events/Projections/AggregationProjection.cs +++ b/src/Marten/Events/Projections/AggregationProjection.cs @@ -9,9 +9,9 @@ namespace Marten.Events.Projections public class AggregationProjection : IProjection where T : class, new() { private readonly IAggregationFinder _finder; - private readonly Aggregator _aggregator; + private readonly IAggregator _aggregator; - public AggregationProjection(IAggregationFinder finder, Aggregator aggregator) + public AggregationProjection(IAggregationFinder finder, IAggregator aggregator) { _finder = finder; _aggregator = aggregator; @@ -52,6 +52,5 @@ public EventStream[] MatchingStreams(IDocumentSession session) return session.PendingChanges.AllChangedFor() .Where(_aggregator.AppliesTo).ToArray(); } - } } \ No newline at end of file diff --git a/src/Marten/Events/Projections/Aggregator.cs b/src/Marten/Events/Projections/Aggregator.cs index e8ec3ff39e..a3ffc1ce48 100644 --- a/src/Marten/Events/Projections/Aggregator.cs +++ b/src/Marten/Events/Projections/Aggregator.cs @@ -7,7 +7,7 @@ namespace Marten.Events.Projections { - public class Aggregator : IAggregator where T : class, new() + public class Aggregator : IAggregator where T : class, new() { public static readonly string ApplyMethod = "Apply"; @@ -34,7 +34,7 @@ public Aggregator() public string Alias { get; } - public T Build(IEnumerable events) + public T Build(IEnumerable events, IDocumentSession session) { var state = new T(); diff --git a/src/Marten/Events/Projections/IAggregator.cs b/src/Marten/Events/Projections/IAggregator.cs index 02e801f148..4775926eda 100644 --- a/src/Marten/Events/Projections/IAggregator.cs +++ b/src/Marten/Events/Projections/IAggregator.cs @@ -1,10 +1,18 @@ using System; +using System.Collections.Generic; namespace Marten.Events.Projections { - public interface IAggregator + public interface IAggregator { Type AggregateType { get; } string Alias { get; } + bool AppliesTo(EventStream stream); + } + + public interface IAggregator : IAggregator + { + IAggregation AggregatorFor(); + T Build(IEnumerable events, IDocumentSession session); } } \ No newline at end of file diff --git a/src/Marten/QuerySession.cs b/src/Marten/QuerySession.cs index 75514de933..c7922e108c 100644 --- a/src/Marten/QuerySession.cs +++ b/src/Marten/QuerySession.cs @@ -9,7 +9,6 @@ using Marten.Schema; using Marten.Services; using Marten.Services.BatchQuerying; -using Marten.Util; using Npgsql; using Remotion.Linq.Parsing.Structure; diff --git a/src/Marten/Services/BatchQuerying/BatchedQuery.cs b/src/Marten/Services/BatchQuerying/BatchedQuery.cs index 549a570c1b..d89ceb2750 100644 --- a/src/Marten/Services/BatchQuerying/BatchedQuery.cs +++ b/src/Marten/Services/BatchQuerying/BatchedQuery.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Linq.Expressions; using System.Threading; using System.Threading.Tasks; using Baseline;