Skip to content

Commit

Permalink
Finally able to correctly forward messages to IEvent<T> when using th…
Browse files Browse the repository at this point in the history
…e event forwarding
  • Loading branch information
jeremydmiller committed May 16, 2024
1 parent da097b4 commit c679892
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 27 deletions.
126 changes: 126 additions & 0 deletions src/Persistence/MartenTests/Bugs/event_forwarding_bug.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten;
using Marten.Events;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Runtime.Routing;
using Wolverine.Tracking;

namespace MartenTests.Bugs;

public class event_forwarding_bug
{
[Fact]
public async Task publish_ievent_of_t()
{
// The "Arrange"
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Policies.AutoApplyTransactions();

opts.Durability.Mode = DurabilityMode.Solo;

opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "forwarding";

m.Events.StreamIdentity = StreamIdentity.AsString;
m.Projections.LiveStreamAggregation<ShoppingList>();
}).UseLightweightSessions()
.IntegrateWithWolverine()
.EventForwardingToWolverine();;
}).StartAsync();

var runtime = host.GetRuntime();
var routing = runtime.RoutingFor(typeof(Event<ShoppingListCreated>));

// The "Act". This method is an extension method in Wolverine
// specifically for facilitating integration testing that should
// invoke the given message with Wolverine, then wait until all
// additional "work" is complete
var session = await host.InvokeMessageAndWaitAsync(new CreateShoppingList());

// And finally, just assert that a single message of
// type IEvent<ShoppingListCreated> was executed
session.Executed.SingleMessage<IEvent<ShoppingListCreated>>()
.ShouldNotBeNull();
}
}

public record AddShoppingListItem(string ShoppingListId, string ItemName);

public static class AddShoppingListItemHandler
{
public static async Task Handle(AddShoppingListItem command, IDocumentSession session)
{
var stream = await session.Events.FetchForWriting<ShoppingList>(command.ShoppingListId);
var shoppingList = stream.Aggregate;
if (shoppingList is null)
throw new InvalidOperationException("Shopping list does not exist");

if (shoppingList.Contains(command.ItemName))
throw new InvalidOperationException("Item is already in shopping list");

stream.AppendOne(new ShoppingListItemAdded(command.ItemName));
}
}

public record CreateShoppingList();

public static class CreateShoppingListHandler
{
public static string Handle(CreateShoppingList _, IDocumentSession session)
{
var shoppingListId = CombGuidIdGeneration.NewGuid().ToString();
session.Events.StartStream<ShoppingList>(shoppingListId, new ShoppingListCreated(shoppingListId));
return shoppingListId;
}
}

public static class IntegrationHandler
{
public static void Handle(IEvent<ShoppingListCreated> _)
{
// Don't need a body here, and I'll show why not
// next
}
}

public record ShoppingListCreated(string Id);

public record ShoppingListItemAdded(string ItemName);

public class ShoppingList
{
public string Id { get; init; } = null!;
private List<ShoppingListItem> Items { get; init; } = null!;

public bool Contains(string itemName) => Items.Any(item => item.Name == itemName);

public static ShoppingList Create(ShoppingListCreated _)
{
return new ShoppingList
{
Items = [],
};
}

public void Apply(ShoppingListItemAdded @event)
{
Items.Add(new ShoppingListItem
{
Name = @event.ItemName
});
}
}

public record ShoppingListItem
{
public required string Name { get; init; }
}
7 changes: 6 additions & 1 deletion src/Persistence/Wolverine.Marten/MartenIntegration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Wolverine.Postgresql.Transport;
using Wolverine.Runtime;
using Wolverine.Runtime.Routing;
using Wolverine.Util;

namespace Wolverine.Marten;

Expand Down Expand Up @@ -80,7 +81,11 @@ public IEnumerable<IMessageRoute> FindRoutes(Type messageType, IWolverineRuntime
if (messageType.IsConcrete())
{
var inner = runtime.RoutingFor(wrappedType);
innerRoutes = inner.Routes.OfType<MessageRoute>().ToArray();
innerRoutes = inner.Routes.Concat(new LocalRouting().FindRoutes(wrappedType, runtime)).OfType<MessageRoute>().ToArray();
}
else
{
innerRoutes = new LocalRouting().FindRoutes(wrappedType, runtime).OfType<MessageRoute>().ToArray();
}

// First look for explicit transformations
Expand Down
66 changes: 40 additions & 26 deletions src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,42 +135,56 @@ public void AddRange(IEnumerable<HandlerCall> calls)

throw new NotSupportedException();
}

if (_chains.TryFind(messageType, out var chain))
{
if (chain.Handler != null)
{
handler = chain.Handler;
}
else
{
lock (_compilingLock)
{
Debug.WriteLine("Starting to compile chain " + chain.MessageType.NameInCode());
if (chain.Handler == null)
{
chain.InitializeSynchronously(Rules, this, Container);
handler = chain.CreateHandler(Container!);
}
else
{
handler = chain.Handler;
}

Debug.WriteLine("Finished building the chain " + chain.MessageType.NameInCode());
}
}

_handlers = _handlers.AddOrUpdate(messageType, handler);
return resolveHandlerFromChain(messageType, chain);
}

return handler;
// This was to handle moving Event<T> to IEvent<T>
var candidates = _chains.Enumerate().Where(x => messageType.CanBeCastTo(x.Key)).ToArray();
if (candidates.Length == 1)
{
chain = candidates[0].Value;
return resolveHandlerFromChain(messageType, chain);
}

// memoize the "miss"
_handlers = _handlers.AddOrUpdate(messageType, null);
return null;
}

private IMessageHandler? resolveHandlerFromChain(Type messageType, HandlerChain chain)
{
IMessageHandler handler;
if (chain.Handler != null)
{
handler = chain.Handler;
}
else
{
lock (_compilingLock)
{
Debug.WriteLine("Starting to compile chain " + chain.MessageType.NameInCode());
if (chain.Handler == null)
{
chain.InitializeSynchronously(Rules, this, Container);
handler = chain.CreateHandler(Container!);
}
else
{
handler = chain.Handler;
}

Debug.WriteLine("Finished building the chain " + chain.MessageType.NameInCode());
}
}

_handlers = _handlers.AddOrUpdate(messageType, handler);

return handler;
}

internal void Compile(WolverineOptions options, IContainer container)
{
if (_hasCompiled)
Expand Down

0 comments on commit c679892

Please sign in to comment.