Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Testing/CoreTests/Runtime/Handlers/HandlerGraphTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,27 @@ public async Task register_message_type()
type.ShouldBe(typeof(DummyMessage));
}

[Fact]
public async Task register_message_type_with_multiple_aliases()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.RegisterMessageType(typeof(DummyMessage), "custom-alias-1");
opts.RegisterMessageType(typeof(DummyMessage), "custom-alias-2");
}).StartAsync();

var graph = host.Services.GetRequiredService<HandlerGraph>();

graph.TryFindMessageType(typeof(DummyMessage).ToMessageTypeName(), out _).ShouldBeFalse();

graph.TryFindMessageType("custom-alias-1", out var firstType).ShouldBeTrue();
firstType.ShouldBe(typeof(DummyMessage));

graph.TryFindMessageType("custom-alias-2", out var secondType).ShouldBeTrue();
secondType.ShouldBe(typeof(DummyMessage));
}

[Fact]
public async Task register_message_types_with_same_alias_throws_invalid_operation_exception()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Text;
using System.Text.Json;
using Wolverine.Runtime.Handlers;
using Wolverine.Runtime.Interop;
using Wolverine.Util;
using Xunit;

namespace CoreTests.Runtime.Interop;
Expand All @@ -14,6 +16,8 @@ public CloudEventsMapper_unknown_type_tests()
{
_handlers = new HandlerGraph();
_handlers.RegisterMessageType(typeof(ApproveOrder), "com.dapr.event.sent");
_handlers.RegisterMessageType(typeof(MultiAliasApproveOrder), "data.updated.v1");
_handlers.RegisterMessageType(typeof(MultiAliasApproveOrder), "data.updated.v2");

var options = new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
_mapper = new CloudEventsMapper(_handlers, options);
Expand Down Expand Up @@ -79,4 +83,53 @@ public void should_overwrite_raw_type_with_resolved_type_on_success()
envelope.MessageType.ShouldNotBeNull();
envelope.Message.ShouldBeOfType<ApproveOrder>();
}

[Fact]
public void should_fall_back_to_original_message_type_when_cloudevent_type_is_unknown()
{
var json = """
{
"data": { "orderId": 1 },
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"type": "some.unknown.event.v1",
"source": "test"
}
""";

var envelope = new Envelope
{
Data = Encoding.UTF8.GetBytes(json),
MessageType = typeof(FallbackApproveOrder).ToMessageTypeName()
};

var message = _mapper.ReadFromData(typeof(FallbackApproveOrder), envelope);

message.ShouldBeOfType<FallbackApproveOrder>().OrderId.ShouldBe(1);
envelope.Message.ShouldBeSameAs(message);
envelope.MessageType.ShouldBe(typeof(FallbackApproveOrder).ToMessageTypeName());
}

[Fact]
public void should_resolve_second_registered_alias_for_same_message_type()
{
var json = """
{
"data": { "orderId": 2 },
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"type": "data.updated.v2",
"source": "test"
}
""";

var envelope = new Envelope();
_mapper.MapIncoming(envelope, json);

envelope.Message.ShouldBeOfType<MultiAliasApproveOrder>().OrderId.ShouldBe(2);
envelope.MessageType.ShouldNotBe("data.updated.v2");
}
}

public record FallbackApproveOrder(int OrderId);
public record MultiAliasApproveOrder(int OrderId);
15 changes: 5 additions & 10 deletions src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -617,18 +617,13 @@ public void RegisterMessageType(Type messageType)

public void RegisterMessageType(Type messageType, string messageAlias)
{
if (_messageTypes.TryFind(messageAlias, out var type))
{
throw new InvalidOperationException($"Cannot register type {type} with alias {messageAlias} because alias is already used");
}

if (_replyTypes.Contains(messageType))
{
return;
}

lock (_messageTypesLock)
{
if (_messageTypes.TryFind(messageAlias, out var type))
{
throw new InvalidOperationException($"Cannot register type {type} with alias {messageAlias} because alias is already used");
}

_messageTypes = _messageTypes.AddOrUpdate(messageAlias, messageType);
_replyTypes = _replyTypes.Add(messageType);
}
Expand Down
21 changes: 16 additions & 5 deletions src/Wolverine/Runtime/Interop/CloudEventsMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public void MapIncoming(Envelope envelope, string json)
}

public void MapIncoming(Envelope envelope, JsonNode? node)
{
mapIncoming(envelope, node, fallbackType: null);
}

private void mapIncoming(Envelope envelope, JsonNode? node, Type? fallbackType)
{
if (node == null) return;

Expand All @@ -122,7 +127,7 @@ public void MapIncoming(Envelope envelope, JsonNode? node)
}
else if (message.GetValueKind() == JsonValueKind.Object)
{
MapIncoming(envelope, node["Message"]);
mapIncoming(envelope, node["Message"], fallbackType);
return;
}
}
Expand Down Expand Up @@ -165,16 +170,22 @@ public void MapIncoming(Envelope envelope, JsonNode? node)
// If resolution fails, the raw type survives for dead-letter persistence.
envelope.MessageType = cloudEventType;

if (_handlers.TryFindMessageType(cloudEventType, out var messageType))
// Resolve: try CloudEvent type alias first, then fall back to caller-provided
// type (e.g. from DefaultIncomingMessage<T> via ReadFromData)
var resolvedType = _handlers.TryFindMessageType(cloudEventType, out var messageType)
? messageType
: fallbackType;

if (resolvedType != null)
{
var data = node!["data"];
if (data != null)
{
envelope.Message = data.Deserialize(messageType, _options);
envelope.Message = data.Deserialize(resolvedType, _options);
}

// Overwrite with the canonical Wolverine message type name
envelope.MessageType = messageType.ToMessageTypeName();
envelope.MessageType = resolvedType.ToMessageTypeName();
}
else
{
Expand Down Expand Up @@ -205,7 +216,7 @@ public byte[] Write(Envelope envelope)
public object ReadFromData(Type messageType, Envelope envelope)
{
var node = JsonNode.Parse(envelope.Data);
MapIncoming(envelope, node);
mapIncoming(envelope, node, fallbackType: messageType);

return envelope.Message!;
}
Expand Down
Loading