Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/Http/Wolverine.Http.Marten/AggregateAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public AggregateAttribute(string routeOrParameterName)

public override Variable Modify(HttpChain chain, ParameterInfo parameter, IServiceContainer container)
{
if (chain.Method.Method.GetParameters().Where(x => x.HasAttribute<AggregateAttribute>()).Count() > 1)
if (chain.Method.Method.GetParameters().Count(x => x.HasAttribute<AggregateAttribute>()) > 1)
{
throw new InvalidOperationException(
"It is only possible (today) to use a single [Aggregate] attribute on an HTTP handler method. Maybe use [ReadAggregate] if all you need is the projected data");
Expand All @@ -63,8 +63,13 @@ public override Variable Modify(HttpChain chain, ParameterInfo parameter, IServi
chain.Metadata.Produces(404);

AggregateType = parameter.ParameterType;
if (AggregateType.IsNullable())
{
AggregateType = AggregateType.GetInnerTypeFromNullable();
}

var store = container.GetInstance<IDocumentStore>();
var idType = store.Options.Events.StreamIdentity == StreamIdentity.AsGuid ? typeof(Guid) : typeof(string);
var idType = store.Options.FindOrResolveDocumentType(AggregateType).IdType;

IdVariable = FindRouteVariable(idType, chain);
if (IdVariable == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using JasperFx.CodeGeneration;
using Marten.Schema;
using NSubstitute;
using Shouldly;
using Wolverine.Configuration;
using Wolverine.Marten;
using Wolverine.Runtime.Handlers;
using Shouldly;
using Wolverine.Codegen;
using Wolverine.Runtime;
using Wolverine.Runtime.Handlers;

namespace MartenTests;
namespace MartenTests.AggregateHandlerWorkflow;

public class AggregateHandlerAttributeTests
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
using JasperFx.CodeGeneration;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Resources;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using JasperFx.Resources;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests;
namespace MartenTests.AggregateHandlerWorkflow;

public class aggregate_handler_workflow: PostgresqlContext, IAsyncLifetime
{
Expand Down Expand Up @@ -440,4 +440,5 @@ public static Response For(LetterAggregate aggregate)
public int BCount { get; set; }
public int CCount { get; set; }
public int DCount { get; set; }
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
using IntegrationTests;
using JasperFx;
using JasperFx.CodeGeneration;
using JasperFx.Core.Reflection;
using JasperFx.Events;
using JasperFx.Events.Projections;
using JasperFx.Resources;
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit.Abstractions;

namespace MartenTests.AggregateHandlerWorkflow;

public class aggregate_handler_workflow_with_ievent
{
private readonly ITestOutputHelper _output;

public aggregate_handler_workflow_with_ievent(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task use_ievent_as_Guid_id()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(AEventHandler)).IncludeType(typeof(RaiseLetterHandler));

opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.Projections.Snapshot<LetterAggregate>(SnapshotLifecycle.Inline);

m.DisableNpgsqlLogging = true;
})
.UseLightweightSessions()
.IntegrateWithWolverine(x =>
{
x.UseFastEventForwarding = true;
});

opts.Policies.AutoApplyTransactions();


opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

var store = host.DocumentStore();
using var session = store.LightweightSession();

var streamId = Guid.NewGuid();

session.Events.StartStream<LetterAggregate>(streamId, new AEvent(), new BEvent());
await session.SaveChangesAsync();

var tracked = await host.InvokeMessageAndWaitAsync(new RaiseABC(streamId));

tracked.Executed.SingleEnvelope<IEvent<AEvent>>().ShouldNotBeNull();

var doc = await session.LoadAsync<LetterAggregate>(streamId);
doc.DCount.ShouldBe(1);
}

[Fact]
public async Task using_string_as_stream_key()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(StringIdentifiedHandler));

opts.Services.AddMarten(m =>
{
m.Events.StreamIdentity = StreamIdentity.AsString;

m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "as_string";
m.Projections.Add<LetterCountsByStringProjection>(ProjectionLifecycle.Inline);

m.DisableNpgsqlLogging = true;
})
.UseLightweightSessions()
.IntegrateWithWolverine(x =>
{
x.UseFastEventForwarding = true;
});

opts.Policies.AutoApplyTransactions();


opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

var store = host.DocumentStore();
using var session = store.LightweightSession();

var streamKey = Guid.NewGuid().ToString();

var tracked = await host.InvokeMessageAndWaitAsync(new StartLetterCountsByString(streamKey));

tracked.Executed.SingleEnvelope<IEvent<AEvent>>().ShouldNotBeNull();

var doc = await session.LoadAsync<LetterCountsByString>(streamKey);
doc.DCount.ShouldBe(1);
}


}

public static class AEventHandler
{


[AggregateHandler]
public static DEvent Handle(IEvent<AEvent> _, LetterAggregate aggregate)
{
return new DEvent();
}
}

public class LetterCountsByString: IRevisioned
{
public string Id { get; set; }
public int ACount { get; set; }
public int BCount { get; set; }
public int CCount { get; set; }
public int DCount { get; set; }
public int Version { get; set; }
}

public record StartLetterCountsByString(string StreamKey);

public static class StringIdentifiedHandler
{
public static IStartStream Handle(StartLetterCountsByString command)
{
return MartenOps.StartStream<LetterCountsByString>(command.StreamKey, new AEvent(), new BEvent(), new CEvent());
}

[AggregateHandler]
public static DEvent Handle(IEvent<AEvent> e, LetterCountsByString aggregate)
{
aggregate.Id.ShouldBe(e.StreamKey);
return new DEvent();
}
}

public class LetterCountsByStringProjection: SingleStreamProjection<LetterCountsByString, string>
{
public override LetterCountsByString Evolve(LetterCountsByString snapshot, string id, IEvent e)
{
snapshot ??= new LetterCountsByString { Id = id };

switch (e.Data)
{
case AEvent _:
snapshot.ACount++;
break;

case BEvent _:
snapshot.BCount++;
break;

case CEvent _:
snapshot.CCount++;
break;

case DEvent _:
snapshot.DCount++;
break;
}

return snapshot;
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
using IntegrationTests;
using JasperFx;
using JasperFx.CodeGeneration;
using JasperFx.Resources;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Exceptions;
using Marten.Internal.Sessions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using JasperFx.Resources;
using Shouldly;
using Wolverine.ComplianceTests;
using Wolverine.Attributes;
using Wolverine.ComplianceTests;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests;
namespace MartenTests.AggregateHandlerWorkflow;

public class marten_command_workflow_middleware : PostgresqlContext, IDisposable
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// <auto-generated/>
#pragma warning disable
using MartenTests.AggregateHandlerWorkflow;
using Wolverine.Marten.Publishing;

namespace Internal.Generated.WolverineHandlers
Expand All @@ -19,18 +20,18 @@ public Event3Handler1609469393(Wolverine.Marten.Publishing.OutboxedSessionFactor
public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation)
{
// The actual message body
var event3 = (MartenTests.Event3)context.Envelope.Message;
var event3 = (Event3)context.Envelope.Message;

await using var documentSession = _outboxedSessionFactory.OpenSession(context);
var eventStore = documentSession.Events;
var aggregateId = event3.AggregateId;

// Loading Marten aggregate
var eventStream = await eventStore.FetchForWriting<MartenTests.Aggregate>(aggregateId, cancellation).ConfigureAwait(false);
var eventStream = await eventStore.FetchForWriting<Aggregate>(aggregateId, cancellation).ConfigureAwait(false);


// The actual message execution
var outgoing1 = MartenTests.FooHandler.Handle(event3, eventStream.Aggregate);
var outgoing1 = FooHandler.Handle(event3, eventStream.Aggregate);


// Outgoing, cascaded message
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// <auto-generated/>
#pragma warning disable
using MartenTests.AggregateHandlerWorkflow;
using Wolverine.Marten.Publishing;

namespace Internal.Generated.WolverineHandlers
Expand All @@ -19,18 +20,18 @@ public IncrementABHandler79726094(Wolverine.Marten.Publishing.OutboxedSessionFac
public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation)
{
// The actual message body
var incrementAB = (MartenTests.IncrementAB)context.Envelope.Message;
var incrementAB = (IncrementAB)context.Envelope.Message;

await using var documentSession = _outboxedSessionFactory.OpenSession(context);
var eventStore = documentSession.Events;
var aggregateId = incrementAB.LetterAggregateId;

// Loading Marten aggregate
var eventStream = await eventStore.FetchForExclusiveWriting<MartenTests.LetterAggregate>(aggregateId, cancellation).ConfigureAwait(false);
var eventStream = await eventStore.FetchForExclusiveWriting<LetterAggregate>(aggregateId, cancellation).ConfigureAwait(false);


// The actual message execution
var outgoing1 = MartenTests.SpecialLetterHandler.Handle(incrementAB, eventStream.Aggregate);
var outgoing1 = SpecialLetterHandler.Handle(incrementAB, eventStream.Aggregate);

if (outgoing1 != null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// <auto-generated/>
#pragma warning disable
using MartenTests.AggregateHandlerWorkflow;
using Wolverine.Marten.Publishing;

namespace Internal.Generated.WolverineHandlers
Expand All @@ -19,16 +20,16 @@ public IncrementAHandler1658474384(Wolverine.Marten.Publishing.OutboxedSessionFa
public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation)
{
// The actual message body
var incrementA = (MartenTests.IncrementA)context.Envelope.Message;
var incrementA = (IncrementA)context.Envelope.Message;

await using var documentSession = _outboxedSessionFactory.OpenSession(context);
var eventStore = documentSession.Events;
var aggregateId = incrementA.LetterAggregateId;

// Loading Marten aggregate
var eventStream = await eventStore.FetchForWriting<MartenTests.LetterAggregate>(aggregateId, cancellation).ConfigureAwait(false);
var eventStream = await eventStore.FetchForWriting<LetterAggregate>(aggregateId, cancellation).ConfigureAwait(false);

var letterAggregateHandler = new MartenTests.LetterAggregateHandler();
var letterAggregateHandler = new LetterAggregateHandler();

// The actual message execution
var outgoing1 = letterAggregateHandler.Handle(incrementA, eventStream.Aggregate, documentSession);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// <auto-generated/>
#pragma warning disable
using MartenTests.AggregateHandlerWorkflow;
using Wolverine.Marten.Publishing;

namespace Internal.Generated.WolverineHandlers
Expand All @@ -19,16 +20,16 @@ public IncrementBCHandler483010622(Wolverine.Marten.Publishing.OutboxedSessionFa
public override async System.Threading.Tasks.Task HandleAsync(Wolverine.Runtime.MessageContext context, System.Threading.CancellationToken cancellation)
{
// The actual message body
var incrementBC = (MartenTests.IncrementBC)context.Envelope.Message;
var incrementBC = (IncrementBC)context.Envelope.Message;

await using var documentSession = _outboxedSessionFactory.OpenSession(context);
var eventStore = documentSession.Events;
var aggregateId = incrementBC.LetterAggregateId;

// Loading Marten aggregate
var eventStream = await eventStore.FetchForWriting<MartenTests.LetterAggregate>(aggregateId, incrementBC.Version, cancellation).ConfigureAwait(false);
var eventStream = await eventStore.FetchForWriting<LetterAggregate>(aggregateId, incrementBC.Version, cancellation).ConfigureAwait(false);

var letterAggregateHandler = new MartenTests.LetterAggregateHandler();
var letterAggregateHandler = new LetterAggregateHandler();

// The actual message execution
var outgoing1 = letterAggregateHandler.Handle(incrementBC, eventStream.Aggregate);
Expand Down
Loading
Loading