Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Marten.Testing/Events/Projections/AggregatorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void build_a_series_of_events()
.Add(new MembersJoined {Members = new string[] {"Gandalf", "Boromir", "Gimli", "Legolas"}})
.Add(new MembersDeparted() {Members = new string[] {"Frodo", "Sam"}});

var party = theAggregator.Build(stream.Events);
var party = theAggregator.Build(stream.Events, null);

party.Name.ShouldBe("Destroy the Ring");

Expand Down
10 changes: 6 additions & 4 deletions src/Marten/Events/AggregationQueryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ namespace Marten.Events
{
internal class AggregationQueryHandler<T> : IQueryHandler<T> where T : class, new()
{
private readonly Aggregator<T> _aggregator;
private readonly IAggregator<T> _aggregator;
private readonly EventQueryHandler _inner;
private readonly IDocumentSession _session;

public AggregationQueryHandler(Aggregator<T> aggregator, EventQueryHandler inner)
public AggregationQueryHandler(IAggregator<T> aggregator, EventQueryHandler inner, IDocumentSession session = null)
{
_aggregator = aggregator;
_inner = inner;
_session = session;
}

public void ConfigureCommand(NpgsqlCommand command)
Expand All @@ -31,14 +33,14 @@ public T Handle(DbDataReader reader, IIdentityMap map)
{
var @events = _inner.Handle(reader, map);

return _aggregator.Build(@events);
return _aggregator.Build(@events, _session);
}

public async Task<T> HandleAsync(DbDataReader reader, IIdentityMap map, CancellationToken token)
{
var @events = await _inner.HandleAsync(reader, map, token).ConfigureAwait(false);

return _aggregator.Build(@events);
return _aggregator.Build(@events, _session);
}
}
}
4 changes: 2 additions & 2 deletions src/Marten/Events/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface IEvent
int Version { get; set; }
object Data { get; }

void Apply<TAggregate>(TAggregate state, Aggregator<TAggregate> aggregator)
void Apply<TAggregate>(TAggregate state, IAggregator<TAggregate> aggregator)
where TAggregate : class, new();
}

Expand All @@ -26,7 +26,7 @@ public Event(T data)

object IEvent.Data => Data;

public virtual void Apply<TAggregate>(TAggregate state, Aggregator<TAggregate> aggregator)
public virtual void Apply<TAggregate>(TAggregate state, IAggregator<TAggregate> aggregator)
where TAggregate : class, new()
{
aggregator.AggregatorFor<T>()?.Apply(state, Data);
Expand Down
11 changes: 8 additions & 3 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,16 @@ IncludeJoin<TOther> IDocumentMapping.JoinToInclude<TOther>(JoinType joinType, ID
throw new NotSupportedException();
}

public Aggregator<T> AggregateFor<T>() where T : class, new()
public void AggregateFor<T>(IAggregator<T> aggregator) where T : class, new()
{
_aggregates.AddOrUpdate(typeof (T), aggregator, (type, previous) => aggregator);
}

public IAggregator<T> AggregateFor<T>() where T : class, new()
{
return _aggregates
.GetOrAdd(typeof (T), type => new Aggregator<T>())
.As<Aggregator<T>>();
.As<IAggregator<T>>();
}


Expand All @@ -224,7 +229,7 @@ public Type AggregateTypeFor(string aggregateTypeName)
}).AggregateType;
}

public Aggregator<T> AggregateStreamsInlineWith<T>() where T : class, new()
public IAggregator<T> AggregateStreamsInlineWith<T>() where T : class, new()
{
var aggregator = AggregateFor<T>();
var finder = new AggregateFinder<T>();
Expand Down
4 changes: 2 additions & 2 deletions src/Marten/Events/EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Task<IList<IEvent>> FetchStreamAsync(Guid streamId, int version = 0, Date
{
var inner = new EventQueryHandler(_selector, streamId, version, timestamp);
var aggregator = _schema.Events.AggregateFor<T>();
var handler = new AggregationQueryHandler<T>(aggregator, inner);
var handler = new AggregationQueryHandler<T>(aggregator, inner, _session);

return _connection.Fetch(handler, null);
}
Expand All @@ -93,7 +93,7 @@ public Task<T> AggregateStreamAsync<T>(Guid streamId, int version = 0, DateTime?
{
var inner = new EventQueryHandler(_selector, streamId, version, timestamp);
var aggregator = _schema.Events.AggregateFor<T>();
var handler = new AggregationQueryHandler<T>(aggregator, inner);
var handler = new AggregationQueryHandler<T>(aggregator, inner, _session);

return _connection.FetchAsync(handler, null, token);
}
Expand Down
6 changes: 4 additions & 2 deletions src/Marten/Events/IEventStoreConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ public interface IEventStoreConfiguration
IEnumerable<IAggregator> AllAggregates();
EventMapping EventMappingFor(string eventType);
bool IsActive { get; }
Aggregator<T> AggregateFor<T>() where T : class, new();

void AggregateFor<T>(IAggregator<T> aggregator) where T : class, new();
IAggregator<T> AggregateFor<T>() where T : class, new();
Type AggregateTypeFor(string aggregateTypeName);
Aggregator<T> AggregateStreamsInlineWith<T>() where T : class, new();
IAggregator<T> AggregateStreamsInlineWith<T>() where T : class, new();

void TransformEventsInlineWith<TEvent, TView>(ITransform<TEvent, TView> transform);
void InlineTransformation(IProjection projection);
Expand Down
5 changes: 2 additions & 3 deletions src/Marten/Events/Projections/AggregationProjection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Marten.Events.Projections
public class AggregationProjection<T> : IProjection where T : class, new()
{
private readonly IAggregationFinder<T> _finder;
private readonly Aggregator<T> _aggregator;
private readonly IAggregator<T> _aggregator;

public AggregationProjection(IAggregationFinder<T> finder, Aggregator<T> aggregator)
public AggregationProjection(IAggregationFinder<T> finder, IAggregator<T> aggregator)
{
_finder = finder;
_aggregator = aggregator;
Expand Down Expand Up @@ -52,6 +52,5 @@ public EventStream[] MatchingStreams(IDocumentSession session)
return session.PendingChanges.AllChangedFor<EventStream>()
.Where(_aggregator.AppliesTo).ToArray();
}

}
}
4 changes: 2 additions & 2 deletions src/Marten/Events/Projections/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Marten.Events.Projections
{
public class Aggregator<T> : IAggregator where T : class, new()
public class Aggregator<T> : IAggregator<T> where T : class, new()
{
public static readonly string ApplyMethod = "Apply";

Expand All @@ -34,7 +34,7 @@ public Aggregator()

public string Alias { get; }

public T Build(IEnumerable<IEvent> events)
public T Build(IEnumerable<IEvent> events, IDocumentSession session)
{
var state = new T();

Expand Down
10 changes: 9 additions & 1 deletion src/Marten/Events/Projections/IAggregator.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
using System;
using System.Collections.Generic;

namespace Marten.Events.Projections
{
public interface IAggregator
public interface IAggregator
{
Type AggregateType { get; }
string Alias { get; }
bool AppliesTo(EventStream stream);
}

public interface IAggregator<T> : IAggregator
{
IAggregation<T, TEvent> AggregatorFor<TEvent>();
T Build(IEnumerable<IEvent> events, IDocumentSession session);
}
}
1 change: 0 additions & 1 deletion src/Marten/QuerySession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using Marten.Schema;
using Marten.Services;
using Marten.Services.BatchQuerying;
using Marten.Util;
using Npgsql;
using Remotion.Linq.Parsing.Structure;

Expand Down
1 change: 0 additions & 1 deletion src/Marten/Services/BatchQuerying/BatchedQuery.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Baseline;
Expand Down