Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/guide/messaging/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,37 @@ builder.UseWolverine(opts =>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/DocumentationSamples/PartitioningSamples.cs#L86-L110' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_message_grouping_rules' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Grouping by Property Name <Badge type="tip" text="5.17" />

If your message contracts are auto-generated (e.g. from `.proto` files) and you cannot add a marker interface,
you can use the `ByPropertyNamed()` rule to look for a property by name on any message type. This is a built-in
`IGroupingRule` that inspects the incoming message type at runtime for a property matching one of the specified names
and uses its value as the `GroupId`.

The first matching property name wins, and property values of any type are converted to `string` via `ToString()`.
Null property values result in `string.Empty`. If no matching property is found on a message type, the rule falls
through to the next rule in the chain.

The property accessor is compiled via `LambdaBuilder` and memoized per message type for performance.

<!-- snippet: sample_configuring_by_property_name -->
<a id='snippet-sample_configuring_by_property_name'></a>
```cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
opts.MessagePartitioning
// Look for a property named "StreamId" or "Id" on the message type
// and use its value as the GroupId for partitioned processing.
// The first matching property name wins.
// This is particularly useful when message types are auto-generated
// (e.g. from .proto files) and cannot implement a marker interface.
.ByPropertyNamed("StreamId", "Id");
});
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Samples/DocumentationSamples/PartitioningSamples.cs#L113-L127' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_by_property_name' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Explicit Group Ids

::: tip
Expand Down
19 changes: 19 additions & 0 deletions src/Samples/DocumentationSamples/PartitioningSamples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,25 @@ public static async Task configuring_message_grouping_rules()
#endregion
}

public static async Task configuring_by_property_name()
{
#region sample_configuring_by_property_name

var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
opts.MessagePartitioning
// Look for a property named "StreamId" or "Id" on the message type
// and use its value as the GroupId for partitioned processing.
// The first matching property name wins.
// This is particularly useful when message types are auto-generated
// (e.g. from .proto files) and cannot implement a marker interface.
.ByPropertyNamed("StreamId", "Id");
});

#endregion
}

#region sample_send_message_with_group_id

public static async Task SendMessageToGroup(IMessageBus bus)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
using Wolverine.ComplianceTests;
using Wolverine.Runtime.Partitioning;
using Xunit;

namespace CoreTests.Runtime.Partitioning;

public class PropertyNameGroupingRuleTests
{
[Fact]
public void match_string_property()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var envelope = ObjectMother.Envelope();
envelope.Message = new StringIdMessage("abc-123");

rules.DetermineGroupId(envelope).ShouldBe("abc-123");
}

[Fact]
public void match_guid_property()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var id = Guid.NewGuid();
var envelope = ObjectMother.Envelope();
envelope.Message = new GuidIdMessage(id);

rules.DetermineGroupId(envelope).ShouldBe(id.ToString());
}

[Fact]
public void match_int_property()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var envelope = ObjectMother.Envelope();
envelope.Message = new IntIdMessage(42);

rules.DetermineGroupId(envelope).ShouldBe("42");
}

[Fact]
public void match_long_property()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var envelope = ObjectMother.Envelope();
envelope.Message = new LongIdMessage(9876543210L);

rules.DetermineGroupId(envelope).ShouldBe("9876543210");
}

[Fact]
public void null_property_value_returns_empty_string()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var envelope = ObjectMother.Envelope();
envelope.Message = new StringIdMessage(null!);

rules.DetermineGroupId(envelope).ShouldBe(string.Empty);
}

[Fact]
public void no_matching_property_returns_null()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var envelope = ObjectMother.Envelope();
envelope.Message = new NoIdMessage("hello");

rules.DetermineGroupId(envelope).ShouldBeNull();
}

[Fact]
public void first_matching_property_name_wins()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("StreamId", "Id");

var envelope = ObjectMother.Envelope();
envelope.Message = new BothIdsMessage("stream-1", "id-2");

rules.DetermineGroupId(envelope).ShouldBe("stream-1");
}

[Fact]
public void falls_through_to_second_property_name()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("StreamId", "Id");

var envelope = ObjectMother.Envelope();
envelope.Message = new StringIdMessage("abc");

// StringIdMessage only has "Id", not "StreamId"
rules.DetermineGroupId(envelope).ShouldBe("abc");
}

[Fact]
public void memoizes_across_multiple_messages_of_same_type()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var envelope1 = ObjectMother.Envelope();
envelope1.Message = new StringIdMessage("first");

var envelope2 = ObjectMother.Envelope();
envelope2.Message = new StringIdMessage("second");

rules.DetermineGroupId(envelope1).ShouldBe("first");
rules.DetermineGroupId(envelope2).ShouldBe("second");
}

[Fact]
public void works_with_different_message_types()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var guid = Guid.NewGuid();

var envelope1 = ObjectMother.Envelope();
envelope1.Message = new StringIdMessage("abc");

var envelope2 = ObjectMother.Envelope();
envelope2.Message = new IntIdMessage(42);

var envelope3 = ObjectMother.Envelope();
envelope3.Message = new GuidIdMessage(guid);

var envelope4 = ObjectMother.Envelope();
envelope4.Message = new LongIdMessage(999L);

rules.DetermineGroupId(envelope1).ShouldBe("abc");
rules.DetermineGroupId(envelope2).ShouldBe("42");
rules.DetermineGroupId(envelope3).ShouldBe(guid.ToString());
rules.DetermineGroupId(envelope4).ShouldBe("999");
}

[Fact]
public void explicit_group_id_takes_precedence()
{
var rules = new MessagePartitioningRules(new());
rules.ByPropertyNamed("Id");

var envelope = ObjectMother.Envelope();
envelope.GroupId = "explicit";
envelope.Message = new StringIdMessage("from-property");

rules.DetermineGroupId(envelope).ShouldBe("explicit");
}
}

public record StringIdMessage(string Id);
public record GuidIdMessage(Guid Id);
public record IntIdMessage(int Id);
public record LongIdMessage(long Id);
public record NoIdMessage(string Name);
public record BothIdsMessage(string StreamId, string Id);
13 changes: 13 additions & 0 deletions src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ public MessagePartitioningRules ByRule(IGroupingRule rule)
return this;
}

/// <summary>
/// Determine the GroupId of a message by looking for a property with a matching name
/// on the message type. This is useful when your message types are auto-generated (e.g. from
/// .proto files) and you cannot add a marker interface. The first matching property name wins.
/// Property values are converted to string via ToString(), with null values becoming string.Empty.
/// </summary>
/// <param name="propertyNames">The property names to look for on each message type</param>
public MessagePartitioningRules ByPropertyNamed(params string[] propertyNames)
{
_rules.Add(new PropertyNameGroupingRule(propertyNames));
return this;
}

/// <summary>
/// Add a grouping rule based on a concrete message type and the property
/// of the message type that exposes the group id information
Expand Down
54 changes: 54 additions & 0 deletions src/Wolverine/Runtime/Partitioning/PropertyNameGroupingRule.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using ImTools;
using JasperFx.Core.Reflection;

namespace Wolverine.Runtime.Partitioning;

/// <summary>
/// A built-in IGroupingRule that determines the GroupId by looking for a named property
/// on the message type. Useful when you cannot add a marker interface to your message contracts
/// (e.g. auto-generated from .proto files) but still want partitioned sequential processing.
/// </summary>
internal class PropertyNameGroupingRule : IGroupingRule
{
private readonly string[] _propertyNames;
private ImHashMap<Type, IGrouper?> _groupers = ImHashMap<Type, IGrouper?>.Empty;

public PropertyNameGroupingRule(string[] propertyNames)
{
_propertyNames = propertyNames;
}

public bool TryFindIdentity(Envelope envelope, out string groupId)
{
var messageType = envelope.Message!.GetType();

if (!_groupers.TryFind(messageType, out var grouper))
{
grouper = TryBuildGrouper(messageType);
_groupers = _groupers.AddOrUpdate(messageType, grouper);
}

if (grouper != null)
{
groupId = grouper.ToGroupId(envelope.Message);
return true;
}

groupId = default!;
return false;
}

private IGrouper? TryBuildGrouper(Type messageType)
{
foreach (var propertyName in _propertyNames)
{
var property = messageType.GetProperty(propertyName);
if (property != null)
{
return typeof(Grouper<,>).CloseAndBuildAs<IGrouper>(property, messageType, property.PropertyType);
}
}

return null;
}
}
Loading