Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/DaemonTests/Aggregations/build_aggregate_projection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public async Task simple_scenario()
await store.Advanced.Clean.DeleteAllEventDataAsync();

using var session = store.LightweightSession();
session.ForTenant("blue").Events.StartStream<SimpleEntity>("one", new AEvent(), new BEvent());
session.ForTenant("blue").Events.StartStream<SimpleEntity>("two", new BEvent(), new BEvent());
session.ForTenant("blue").Events.StartStream<SimpleEntity>("three", new AEvent(), new AEvent());
session.ForTenant("blue").Events.StartStream<SimpleEntity>("one", new MTAEvent(), new MTBEvent());
session.ForTenant("blue").Events.StartStream<SimpleEntity>("two", new MTBEvent(), new MTBEvent());
session.ForTenant("blue").Events.StartStream<SimpleEntity>("three", new MTAEvent(), new MTAEvent());

session.ForTenant("red").Events.StartStream<SimpleEntity>("one", new AEvent(), new BEvent(), new AEvent());
session.ForTenant("red").Events.StartStream<SimpleEntity>("two", new BEvent(), new BEvent(), new BEvent());
session.ForTenant("red").Events.StartStream<SimpleEntity>("five", new BEvent(), new BEvent(), new BEvent());
session.ForTenant("red").Events.StartStream<SimpleEntity>("one", new MTAEvent(), new MTBEvent(), new MTAEvent());
session.ForTenant("red").Events.StartStream<SimpleEntity>("two", new MTBEvent(), new MTBEvent(), new MTBEvent());
session.ForTenant("red").Events.StartStream<SimpleEntity>("five", new MTBEvent(), new MTBEvent(), new MTBEvent());

await session.SaveChangesAsync();

Expand Down Expand Up @@ -552,12 +552,12 @@ public class SimpleEntity: ITenanted

public string TenantId { get; set; }

public void Apply(AEvent _)
public void Apply(MTAEvent _)
{
A++;
}

public void Apply(BEvent _)
public void Apply(MTBEvent _)
{
B++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public async Task start_as_inline_move_to_async_and_just_continue()
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Inline);
});

var id1 = theSession.Events.StartStream<SimpleAggregate>(new AEvent(), new BEvent()).Id;
var id2 = theSession.Events.StartStream<SimpleAggregate>(new BEvent(), new CEvent()).Id;
var id3 = theSession.Events.StartStream<SimpleAggregate>(new CEvent(), new DEvent()).Id;
var id1 = theSession.Events.StartStream<SimpleAggregate>(new MTAEvent(), new MTBEvent()).Id;
var id2 = theSession.Events.StartStream<SimpleAggregate>(new MTBEvent(), new MTCEvent()).Id;
var id3 = theSession.Events.StartStream<SimpleAggregate>(new MTCEvent(), new MTDEvent()).Id;
await theSession.SaveChangesAsync();

var store2 = SeparateStore(opts =>
Expand All @@ -46,9 +46,9 @@ public async Task start_as_inline_move_to_async_and_just_continue()
await daemon.StartAllAsync();

using var session = store2.LightweightSession();
session.Events.Append(id1, new EEvent(), new EEvent());
session.Events.Append(id2, new EEvent(), new EEvent());
session.Events.Append(id3, new EEvent(), new EEvent());
session.Events.Append(id1, new MTEEvent(), new MTEEvent());
session.Events.Append(id2, new MTEEvent(), new MTEEvent());
session.Events.Append(id3, new MTEEvent(), new MTEEvent());
await session.SaveChangesAsync();

await daemon.WaitForNonStaleData(10.Seconds());
Expand Down Expand Up @@ -91,27 +91,27 @@ public class SimpleAggregate : IRevisioned
public int DCount { get; set; }
public int ECount { get; set; }

public void Apply(AEvent _)
public void Apply(MTAEvent _)
{
ACount++;
}

public void Apply(BEvent _)
public void Apply(MTBEvent _)
{
BCount++;
}

public void Apply(CEvent _)
public void Apply(MTCEvent _)
{
CCount++;
}

public void Apply(DEvent _)
public void Apply(MTDEvent _)
{
DCount++;
}

public void Apply(EEvent _)
public void Apply(MTEEvent _)
{
ECount++;
}
Expand Down
58 changes: 29 additions & 29 deletions src/DaemonTests/Aggregations/side_effects_in_aggregations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public async Task add_events_single_stream_guid_identifier()
await daemon.StartAllAsync();

var streamId = Guid.NewGuid();
theSession.Events.StartStream<SideEffects1>(streamId, new AEvent(),
new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects1>(streamId, new MTAEvent(),
new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());
Expand All @@ -54,7 +54,7 @@ public async Task add_events_single_stream_guid_identifier()
version1.A.ShouldBe(3);
version1.B.ShouldBe(0);

theSession.Events.Append(streamId, new AEvent(), new AEvent());
theSession.Events.Append(streamId, new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();


Expand Down Expand Up @@ -102,13 +102,13 @@ public async Task calls_side_effects_when_there_is_a_delete_event()
await daemon.StartAllAsync();

var streamId = Guid.NewGuid();
theSession.Events.StartStream<SideEffects1>(streamId, new AEvent(),
new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects1>(streamId, new MTAEvent(),
new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());

theSession.Events.Append(streamId, new EEvent());
theSession.Events.Append(streamId, new MTEEvent());
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());
Expand Down Expand Up @@ -141,8 +141,8 @@ public async Task add_events_single_stream_guid_identifier_when_starting_a_strea
await daemon.StartAllAsync();

var streamId = Guid.NewGuid();
theSession.Events.StartStream<SideEffects1>(streamId, new AEvent(),
new AEvent(), new AEvent(), new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects1>(streamId, new MTAEvent(),
new MTAEvent(), new MTAEvent(), new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();

// Prove the BEevent side effect happened as expected
Expand Down Expand Up @@ -187,8 +187,8 @@ public async Task add_events_single_stream_string_identifier_when_starting_a_str
await daemon.StartAllAsync();

var streamKey = Guid.NewGuid().ToString();
theSession.Events.StartStream<SideEffects2>(streamKey, new AEvent(),
new AEvent(), new AEvent(), new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects2>(streamKey, new MTAEvent(),
new MTAEvent(), new MTAEvent(), new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();

// Prove the BEevent side effect happened as expected
Expand Down Expand Up @@ -233,8 +233,8 @@ public async Task add_events_single_stream_string_identifier()
await daemon.StartAllAsync();

var streamKey = Guid.NewGuid().ToString();
theSession.Events.StartStream<SideEffects2>(streamKey, new AEvent(),
new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects2>(streamKey, new MTAEvent(),
new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(30.Seconds());
Expand All @@ -243,7 +243,7 @@ public async Task add_events_single_stream_string_identifier()
version1.A.ShouldBe(3);
version1.B.ShouldBe(0);

theSession.Events.Append(streamKey, new AEvent(), new AEvent());
theSession.Events.Append(streamKey, new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();


Expand Down Expand Up @@ -290,8 +290,8 @@ public async Task side_effects_do_not_happen_in_rebuilds()
await daemon.StartAllAsync();

var streamKey = Guid.NewGuid().ToString();
theSession.Events.StartStream<SideEffects2>(streamKey, new AEvent(),
new AEvent(), new AEvent(), new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects2>(streamKey, new MTAEvent(),
new MTAEvent(), new MTAEvent(), new MTAEvent(), new MTAEvent());
await theSession.SaveChangesAsync();

// Prove the BEevent side effect happened as expected
Expand Down Expand Up @@ -341,9 +341,9 @@ public async Task publishing_messages_in_continuous_mode()
var stream2 = Guid.NewGuid();
var stream3 = Guid.NewGuid();

theSession.Events.StartStream<SideEffects1>(stream1, new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects1>(stream2, new AEvent(), new AEvent());
theSession.Events.StartStream<SideEffects1>(stream3, new AEvent(), new AEvent(), new BEvent());
theSession.Events.StartStream<SideEffects1>(stream1, new MTAEvent(), new MTAEvent());
theSession.Events.StartStream<SideEffects1>(stream2, new MTAEvent(), new MTAEvent());
theSession.Events.StartStream<SideEffects1>(stream3, new MTAEvent(), new MTAEvent(), new MTBEvent());
await theSession.SaveChangesAsync();

await daemon.WaitForNonStaleData(120.Seconds());
Expand All @@ -363,15 +363,15 @@ public class Projection1: SingleStreamProjection<SideEffects1, Guid>
{
public Projection1()
{
DeleteEvent<EEvent>();
DeleteEvent<MTEEvent>();
}

public void Apply(SideEffects1 aggregate, AEvent _)
public void Apply(SideEffects1 aggregate, MTAEvent _)
{
aggregate.A++;
}

public void Apply(SideEffects1 aggregate, BEvent _)
public void Apply(SideEffects1 aggregate, MTBEvent _)
{
aggregate.B++;
}
Expand All @@ -380,10 +380,10 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven
{
if (slice.Snapshot?.A % 5 == 0)
{
slice.AppendEvent(new BEvent());
slice.AppendEvent(new MTBEvent());
}

if (slice.Events().OfType<IEvent<EEvent>>().Any())
if (slice.Events().OfType<IEvent<MTEEvent>>().Any())
{
slice.PublishMessage(new WasDeleted(slice.Events().First().StreamId));
}
Expand All @@ -407,12 +407,12 @@ public record WasDeleted(Guid Id);

public class Projection2: SingleStreamProjection<SideEffects2, string>
{
public void Apply(SideEffects2 aggregate, AEvent _)
public void Apply(SideEffects2 aggregate, MTAEvent _)
{
aggregate.A++;
}

public void Apply(SideEffects2 aggregate, BEvent _)
public void Apply(SideEffects2 aggregate, MTBEvent _)
{
aggregate.B++;
}
Expand All @@ -421,7 +421,7 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven
{
if (slice.Snapshot.A >= 5 && slice.Snapshot.B == 0)
{
slice.AppendEvent(new BEvent());
slice.AppendEvent(new MTBEvent());
}

return new ValueTask();
Expand All @@ -430,19 +430,19 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven

public class Projection3: SingleStreamProjection<SideEffects1, Guid>
{
public void Apply(SideEffects1 aggregate, AEvent _)
public void Apply(SideEffects1 aggregate, MTAEvent _)
{
aggregate.A++;
}

public void Apply(SideEffects1 aggregate, BEvent _)
public void Apply(SideEffects1 aggregate, MTBEvent _)
{

}

public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<SideEffects1> slice)
{
if (slice.Snapshot != null && slice.Events().OfType<IEvent<BEvent>>().Any())
if (slice.Snapshot != null && slice.Events().OfType<IEvent<MTBEvent>>().Any())
{
slice.PublishMessage(new GotB(slice.Snapshot.Id));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ public async Task do_not_blow_up()

using (var session = theStore.LightweightSession())
{
session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent());
session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent());
session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent());
session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent());
session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent());
session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent());
session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent());
session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent());
session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent());
session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent());

await session.SaveChangesAsync();
}
Expand Down Expand Up @@ -55,15 +55,15 @@ public class UsesAEventOnly
public Guid Id { get; set; }
public int Count { get; set; }

public void Apply(AEvent e) => Count++;
public void Apply(MTAEvent e) => Count++;
}

public class OtherAggregate
{
public Guid Id { get; set; }
public int Count { get; set; }

public void Apply(AEvent e) => Count++;
public void Apply(BEvent e) => Count++;
public void Apply(CEvent e) => Count++;
public void Apply(MTAEvent e) => Count++;
public void Apply(MTBEvent e) => Count++;
public void Apply(MTCEvent e) => Count++;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public class MismatchedIdentityProjection : MultiStreamProjection<Target, string
{
public MismatchedIdentityProjection()
{
Identity<IEvent<AEvent>>(c => c.TenantId);
Identity<IEvent<MTAEvent>>(c => c.TenantId);
}

public void Apply(Target state, IEvent<AEvent> e) => state.Number++;
public void Apply(Target state, IEvent<MTAEvent> e) => state.Number++;
}


Expand Down
14 changes: 7 additions & 7 deletions src/DaemonTests/EventProjections/using_patches_in_async_mode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ public async Task do_some_patching()
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();

theSession.Events.StartStream<SimpleAggregate>(id1, new StartAggregate(), new AEvent(), new AEvent(), new BEvent());
theSession.Events.StartStream<SimpleAggregate>(id2, new StartAggregate(), new AEvent(), new CEvent(), new CEvent());
theSession.Events.StartStream<SimpleAggregate>(id3, new StartAggregate(), new BEvent(), new BEvent(), new BEvent(), new CEvent());
theSession.Events.StartStream<SimpleAggregate>(id1, new StartAggregate(), new MTAEvent(), new MTAEvent(), new MTBEvent());
theSession.Events.StartStream<SimpleAggregate>(id2, new StartAggregate(), new MTAEvent(), new MTCEvent(), new MTCEvent());
theSession.Events.StartStream<SimpleAggregate>(id3, new StartAggregate(), new MTBEvent(), new MTBEvent(), new MTBEvent(), new MTCEvent());

for (int i = 0; i < 100; i++)
{
theSession.Events.StartStream<SimpleAggregate>(new StartAggregate(), new AEvent(), new AEvent(), new BEvent());
theSession.Events.StartStream<SimpleAggregate>(new StartAggregate(), new MTAEvent(), new MTAEvent(), new MTBEvent());
}

await theSession.SaveChangesAsync();
Expand All @@ -60,17 +60,17 @@ public class LetterPatcher: EventProjection
{
public SimpleAggregate Transform(IEvent<StartAggregate> e) => new SimpleAggregate { Id = e.StreamId };

public void Project(IEvent<AEvent> e, IDocumentOperations ops)
public void Project(IEvent<MTAEvent> e, IDocumentOperations ops)
{
ops.Patch<SimpleAggregate>(e.StreamId).Increment(x => x.ACount);
}

public void Project(IEvent<BEvent> e, IDocumentOperations ops)
public void Project(IEvent<MTBEvent> e, IDocumentOperations ops)
{
ops.Patch<SimpleAggregate>(e.StreamId).Increment(x => x.BCount);
}

public void Project(IEvent<CEvent> e, IDocumentOperations ops)
public void Project(IEvent<MTCEvent> e, IDocumentOperations ops)
{
ops.Patch<SimpleAggregate>(e.StreamId).Increment(x => x.CCount);
}
Expand Down
Loading
Loading