diff --git a/docs/guide/messaging/partitioning.md b/docs/guide/messaging/partitioning.md index 4c6334de0..9b128095f 100644 --- a/docs/guide/messaging/partitioning.md +++ b/docs/guide/messaging/partitioning.md @@ -222,6 +222,37 @@ builder.UseWolverine(opts => snippet source | anchor +## Grouping by Property Name + +If your message contracts are auto-generated (e.g. from `.proto` files) and you cannot add a marker interface, +you can use the `ByPropertyNamed()` rule to look for a property by name on any message type. This is a built-in +`IGroupingRule` that inspects the incoming message type at runtime for a property matching one of the specified names +and uses its value as the `GroupId`. + +The first matching property name wins, and property values of any type are converted to `string` via `ToString()`. +Null property values result in `string.Empty`. If no matching property is found on a message type, the rule falls +through to the next rule in the chain. + +The property accessor is compiled via `LambdaBuilder` and memoized per message type for performance. + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + opts.MessagePartitioning + // Look for a property named "StreamId" or "Id" on the message type + // and use its value as the GroupId for partitioned processing. + // The first matching property name wins. + // This is particularly useful when message types are auto-generated + // (e.g. from .proto files) and cannot implement a marker interface. + .ByPropertyNamed("StreamId", "Id"); +}); +``` +snippet source | anchor + + ## Explicit Group Ids ::: tip diff --git a/src/Samples/DocumentationSamples/PartitioningSamples.cs b/src/Samples/DocumentationSamples/PartitioningSamples.cs index de54466d3..05fcb54ad 100644 --- a/src/Samples/DocumentationSamples/PartitioningSamples.cs +++ b/src/Samples/DocumentationSamples/PartitioningSamples.cs @@ -110,6 +110,25 @@ public static async Task configuring_message_grouping_rules() #endregion } + public static async Task configuring_by_property_name() + { + #region sample_configuring_by_property_name + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + opts.MessagePartitioning + // Look for a property named "StreamId" or "Id" on the message type + // and use its value as the GroupId for partitioned processing. + // The first matching property name wins. + // This is particularly useful when message types are auto-generated + // (e.g. from .proto files) and cannot implement a marker interface. + .ByPropertyNamed("StreamId", "Id"); + }); + + #endregion + } + #region sample_send_message_with_group_id public static async Task SendMessageToGroup(IMessageBus bus) diff --git a/src/Testing/CoreTests/Runtime/Partitioning/PropertyNameGroupingRuleTests.cs b/src/Testing/CoreTests/Runtime/Partitioning/PropertyNameGroupingRuleTests.cs new file mode 100644 index 000000000..cbe6c0824 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Partitioning/PropertyNameGroupingRuleTests.cs @@ -0,0 +1,168 @@ +using Wolverine.ComplianceTests; +using Wolverine.Runtime.Partitioning; +using Xunit; + +namespace CoreTests.Runtime.Partitioning; + +public class PropertyNameGroupingRuleTests +{ + [Fact] + public void match_string_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new StringIdMessage("abc-123"); + + rules.DetermineGroupId(envelope).ShouldBe("abc-123"); + } + + [Fact] + public void match_guid_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var id = Guid.NewGuid(); + var envelope = ObjectMother.Envelope(); + envelope.Message = new GuidIdMessage(id); + + rules.DetermineGroupId(envelope).ShouldBe(id.ToString()); + } + + [Fact] + public void match_int_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new IntIdMessage(42); + + rules.DetermineGroupId(envelope).ShouldBe("42"); + } + + [Fact] + public void match_long_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new LongIdMessage(9876543210L); + + rules.DetermineGroupId(envelope).ShouldBe("9876543210"); + } + + [Fact] + public void null_property_value_returns_empty_string() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new StringIdMessage(null!); + + rules.DetermineGroupId(envelope).ShouldBe(string.Empty); + } + + [Fact] + public void no_matching_property_returns_null() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new NoIdMessage("hello"); + + rules.DetermineGroupId(envelope).ShouldBeNull(); + } + + [Fact] + public void first_matching_property_name_wins() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("StreamId", "Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new BothIdsMessage("stream-1", "id-2"); + + rules.DetermineGroupId(envelope).ShouldBe("stream-1"); + } + + [Fact] + public void falls_through_to_second_property_name() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("StreamId", "Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new StringIdMessage("abc"); + + // StringIdMessage only has "Id", not "StreamId" + rules.DetermineGroupId(envelope).ShouldBe("abc"); + } + + [Fact] + public void memoizes_across_multiple_messages_of_same_type() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope1 = ObjectMother.Envelope(); + envelope1.Message = new StringIdMessage("first"); + + var envelope2 = ObjectMother.Envelope(); + envelope2.Message = new StringIdMessage("second"); + + rules.DetermineGroupId(envelope1).ShouldBe("first"); + rules.DetermineGroupId(envelope2).ShouldBe("second"); + } + + [Fact] + public void works_with_different_message_types() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var guid = Guid.NewGuid(); + + var envelope1 = ObjectMother.Envelope(); + envelope1.Message = new StringIdMessage("abc"); + + var envelope2 = ObjectMother.Envelope(); + envelope2.Message = new IntIdMessage(42); + + var envelope3 = ObjectMother.Envelope(); + envelope3.Message = new GuidIdMessage(guid); + + var envelope4 = ObjectMother.Envelope(); + envelope4.Message = new LongIdMessage(999L); + + rules.DetermineGroupId(envelope1).ShouldBe("abc"); + rules.DetermineGroupId(envelope2).ShouldBe("42"); + rules.DetermineGroupId(envelope3).ShouldBe(guid.ToString()); + rules.DetermineGroupId(envelope4).ShouldBe("999"); + } + + [Fact] + public void explicit_group_id_takes_precedence() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.GroupId = "explicit"; + envelope.Message = new StringIdMessage("from-property"); + + rules.DetermineGroupId(envelope).ShouldBe("explicit"); + } +} + +public record StringIdMessage(string Id); +public record GuidIdMessage(Guid Id); +public record IntIdMessage(int Id); +public record LongIdMessage(long Id); +public record NoIdMessage(string Name); +public record BothIdsMessage(string StreamId, string Id); diff --git a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs index 0e4ce5fbe..30605d210 100644 --- a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs +++ b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs @@ -77,6 +77,19 @@ public MessagePartitioningRules ByRule(IGroupingRule rule) return this; } + /// + /// Determine the GroupId of a message by looking for a property with a matching name + /// on the message type. This is useful when your message types are auto-generated (e.g. from + /// .proto files) and you cannot add a marker interface. The first matching property name wins. + /// Property values are converted to string via ToString(), with null values becoming string.Empty. + /// + /// The property names to look for on each message type + public MessagePartitioningRules ByPropertyNamed(params string[] propertyNames) + { + _rules.Add(new PropertyNameGroupingRule(propertyNames)); + return this; + } + /// /// Add a grouping rule based on a concrete message type and the property /// of the message type that exposes the group id information diff --git a/src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs b/src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs new file mode 100644 index 000000000..e0436bc4e --- /dev/null +++ b/src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs @@ -0,0 +1,54 @@ +using ImTools; +using JasperFx.Core.Reflection; + +namespace Wolverine.Runtime.Partitioning; + +/// +/// A built-in IGroupingRule that determines the GroupId by looking for a named property +/// on the message type. Useful when you cannot add a marker interface to your message contracts +/// (e.g. auto-generated from .proto files) but still want partitioned sequential processing. +/// +internal class PropertyNameGroupingRule : IGroupingRule +{ + private readonly string[] _propertyNames; + private ImHashMap _groupers = ImHashMap.Empty; + + public PropertyNameGroupingRule(string[] propertyNames) + { + _propertyNames = propertyNames; + } + + public bool TryFindIdentity(Envelope envelope, out string groupId) + { + var messageType = envelope.Message!.GetType(); + + if (!_groupers.TryFind(messageType, out var grouper)) + { + grouper = TryBuildGrouper(messageType); + _groupers = _groupers.AddOrUpdate(messageType, grouper); + } + + if (grouper != null) + { + groupId = grouper.ToGroupId(envelope.Message); + return true; + } + + groupId = default!; + return false; + } + + private IGrouper? TryBuildGrouper(Type messageType) + { + foreach (var propertyName in _propertyNames) + { + var property = messageType.GetProperty(propertyName); + if (property != null) + { + return typeof(Grouper<,>).CloseAndBuildAs(property, messageType, property.PropertyType); + } + } + + return null; + } +}