From 65cbc19d167c61f18e67133a62ab62cb1851da89 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 2 Mar 2026 11:05:31 -0600 Subject: [PATCH] Add ByPropertyNamed grouping rule for partitioned sequential messaging. Closes #2228 Built-in IGroupingRule that discovers a named property on message types at runtime and uses its value as the GroupId. Property accessors are compiled via LambdaBuilder and memoized per message type. Supports string, int, Guid, long, and any property type via ToString(). Co-Authored-By: Claude Opus 4.6 --- docs/guide/messaging/partitioning.md | 31 ++++ .../PartitioningSamples.cs | 19 ++ .../PropertyNameGroupingRuleTests.cs | 168 ++++++++++++++++++ .../Partitioning/MessagePartitioningRules.cs | 13 ++ .../Partitioning/PropertyNameGroupingRule.cs | 54 ++++++ 5 files changed, 285 insertions(+) create mode 100644 src/Testing/CoreTests/Runtime/Partitioning/PropertyNameGroupingRuleTests.cs create mode 100644 src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs diff --git a/docs/guide/messaging/partitioning.md b/docs/guide/messaging/partitioning.md index 4c6334de0..9b128095f 100644 --- a/docs/guide/messaging/partitioning.md +++ b/docs/guide/messaging/partitioning.md @@ -222,6 +222,37 @@ builder.UseWolverine(opts => snippet source | anchor +## Grouping by Property Name + +If your message contracts are auto-generated (e.g. from `.proto` files) and you cannot add a marker interface, +you can use the `ByPropertyNamed()` rule to look for a property by name on any message type. This is a built-in +`IGroupingRule` that inspects the incoming message type at runtime for a property matching one of the specified names +and uses its value as the `GroupId`. + +The first matching property name wins, and property values of any type are converted to `string` via `ToString()`. +Null property values result in `string.Empty`. If no matching property is found on a message type, the rule falls +through to the next rule in the chain. + +The property accessor is compiled via `LambdaBuilder` and memoized per message type for performance. + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + opts.MessagePartitioning + // Look for a property named "StreamId" or "Id" on the message type + // and use its value as the GroupId for partitioned processing. + // The first matching property name wins. + // This is particularly useful when message types are auto-generated + // (e.g. from .proto files) and cannot implement a marker interface. + .ByPropertyNamed("StreamId", "Id"); +}); +``` +snippet source | anchor + + ## Explicit Group Ids ::: tip diff --git a/src/Samples/DocumentationSamples/PartitioningSamples.cs b/src/Samples/DocumentationSamples/PartitioningSamples.cs index de54466d3..05fcb54ad 100644 --- a/src/Samples/DocumentationSamples/PartitioningSamples.cs +++ b/src/Samples/DocumentationSamples/PartitioningSamples.cs @@ -110,6 +110,25 @@ public static async Task configuring_message_grouping_rules() #endregion } + public static async Task configuring_by_property_name() + { + #region sample_configuring_by_property_name + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + opts.MessagePartitioning + // Look for a property named "StreamId" or "Id" on the message type + // and use its value as the GroupId for partitioned processing. + // The first matching property name wins. + // This is particularly useful when message types are auto-generated + // (e.g. from .proto files) and cannot implement a marker interface. + .ByPropertyNamed("StreamId", "Id"); + }); + + #endregion + } + #region sample_send_message_with_group_id public static async Task SendMessageToGroup(IMessageBus bus) diff --git a/src/Testing/CoreTests/Runtime/Partitioning/PropertyNameGroupingRuleTests.cs b/src/Testing/CoreTests/Runtime/Partitioning/PropertyNameGroupingRuleTests.cs new file mode 100644 index 000000000..cbe6c0824 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Partitioning/PropertyNameGroupingRuleTests.cs @@ -0,0 +1,168 @@ +using Wolverine.ComplianceTests; +using Wolverine.Runtime.Partitioning; +using Xunit; + +namespace CoreTests.Runtime.Partitioning; + +public class PropertyNameGroupingRuleTests +{ + [Fact] + public void match_string_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new StringIdMessage("abc-123"); + + rules.DetermineGroupId(envelope).ShouldBe("abc-123"); + } + + [Fact] + public void match_guid_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var id = Guid.NewGuid(); + var envelope = ObjectMother.Envelope(); + envelope.Message = new GuidIdMessage(id); + + rules.DetermineGroupId(envelope).ShouldBe(id.ToString()); + } + + [Fact] + public void match_int_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new IntIdMessage(42); + + rules.DetermineGroupId(envelope).ShouldBe("42"); + } + + [Fact] + public void match_long_property() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new LongIdMessage(9876543210L); + + rules.DetermineGroupId(envelope).ShouldBe("9876543210"); + } + + [Fact] + public void null_property_value_returns_empty_string() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new StringIdMessage(null!); + + rules.DetermineGroupId(envelope).ShouldBe(string.Empty); + } + + [Fact] + public void no_matching_property_returns_null() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new NoIdMessage("hello"); + + rules.DetermineGroupId(envelope).ShouldBeNull(); + } + + [Fact] + public void first_matching_property_name_wins() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("StreamId", "Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new BothIdsMessage("stream-1", "id-2"); + + rules.DetermineGroupId(envelope).ShouldBe("stream-1"); + } + + [Fact] + public void falls_through_to_second_property_name() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("StreamId", "Id"); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new StringIdMessage("abc"); + + // StringIdMessage only has "Id", not "StreamId" + rules.DetermineGroupId(envelope).ShouldBe("abc"); + } + + [Fact] + public void memoizes_across_multiple_messages_of_same_type() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope1 = ObjectMother.Envelope(); + envelope1.Message = new StringIdMessage("first"); + + var envelope2 = ObjectMother.Envelope(); + envelope2.Message = new StringIdMessage("second"); + + rules.DetermineGroupId(envelope1).ShouldBe("first"); + rules.DetermineGroupId(envelope2).ShouldBe("second"); + } + + [Fact] + public void works_with_different_message_types() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var guid = Guid.NewGuid(); + + var envelope1 = ObjectMother.Envelope(); + envelope1.Message = new StringIdMessage("abc"); + + var envelope2 = ObjectMother.Envelope(); + envelope2.Message = new IntIdMessage(42); + + var envelope3 = ObjectMother.Envelope(); + envelope3.Message = new GuidIdMessage(guid); + + var envelope4 = ObjectMother.Envelope(); + envelope4.Message = new LongIdMessage(999L); + + rules.DetermineGroupId(envelope1).ShouldBe("abc"); + rules.DetermineGroupId(envelope2).ShouldBe("42"); + rules.DetermineGroupId(envelope3).ShouldBe(guid.ToString()); + rules.DetermineGroupId(envelope4).ShouldBe("999"); + } + + [Fact] + public void explicit_group_id_takes_precedence() + { + var rules = new MessagePartitioningRules(new()); + rules.ByPropertyNamed("Id"); + + var envelope = ObjectMother.Envelope(); + envelope.GroupId = "explicit"; + envelope.Message = new StringIdMessage("from-property"); + + rules.DetermineGroupId(envelope).ShouldBe("explicit"); + } +} + +public record StringIdMessage(string Id); +public record GuidIdMessage(Guid Id); +public record IntIdMessage(int Id); +public record LongIdMessage(long Id); +public record NoIdMessage(string Name); +public record BothIdsMessage(string StreamId, string Id); diff --git a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs index 0e4ce5fbe..30605d210 100644 --- a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs +++ b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs @@ -77,6 +77,19 @@ public MessagePartitioningRules ByRule(IGroupingRule rule) return this; } + /// + /// Determine the GroupId of a message by looking for a property with a matching name + /// on the message type. This is useful when your message types are auto-generated (e.g. from + /// .proto files) and you cannot add a marker interface. The first matching property name wins. + /// Property values are converted to string via ToString(), with null values becoming string.Empty. + /// + /// The property names to look for on each message type + public MessagePartitioningRules ByPropertyNamed(params string[] propertyNames) + { + _rules.Add(new PropertyNameGroupingRule(propertyNames)); + return this; + } + /// /// Add a grouping rule based on a concrete message type and the property /// of the message type that exposes the group id information diff --git a/src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs b/src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs new file mode 100644 index 000000000..e0436bc4e --- /dev/null +++ b/src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs @@ -0,0 +1,54 @@ +using ImTools; +using JasperFx.Core.Reflection; + +namespace Wolverine.Runtime.Partitioning; + +/// +/// A built-in IGroupingRule that determines the GroupId by looking for a named property +/// on the message type. Useful when you cannot add a marker interface to your message contracts +/// (e.g. auto-generated from .proto files) but still want partitioned sequential processing. +/// +internal class PropertyNameGroupingRule : IGroupingRule +{ + private readonly string[] _propertyNames; + private ImHashMap _groupers = ImHashMap.Empty; + + public PropertyNameGroupingRule(string[] propertyNames) + { + _propertyNames = propertyNames; + } + + public bool TryFindIdentity(Envelope envelope, out string groupId) + { + var messageType = envelope.Message!.GetType(); + + if (!_groupers.TryFind(messageType, out var grouper)) + { + grouper = TryBuildGrouper(messageType); + _groupers = _groupers.AddOrUpdate(messageType, grouper); + } + + if (grouper != null) + { + groupId = grouper.ToGroupId(envelope.Message); + return true; + } + + groupId = default!; + return false; + } + + private IGrouper? TryBuildGrouper(Type messageType) + { + foreach (var propertyName in _propertyNames) + { + var property = messageType.GetProperty(propertyName); + if (property != null) + { + return typeof(Grouper<,>).CloseAndBuildAs(property, messageType, property.PropertyType); + } + } + + return null; + } +}