Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ const config: UserConfig<DefaultTheme.Config> = {
{text: 'Endpoint Specific Operations', link: '/guide/messaging/endpoint-operations'},
{text: 'Broadcast to a Specific Topic', link: '/guide/messaging/broadcast-to-topic'},
{text: 'Message Expiration', link: '/guide/messaging/expiration'},
{text: 'Header Propagation', link: '/guide/messaging/header-propagation'},
{text: 'Endpoint Policies', link: '/guide/messaging/policies'},
{text: 'Sending Error Handling', link: '/guide/messaging/sending-error-handling'}
]
Expand Down
14 changes: 14 additions & 0 deletions docs/guide/messaging/header-propagation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Header Propagation

When consuming messages from external systems, those messages may carry custom headers that need to flow through to any downstream messages your handlers produce. Use `PropagateIncomingHeadersToOutgoing` to declare which headers should be forwarded automatically:

```csharp
builder.Host.UseWolverine(opts =>
{
opts.Policies.PropagateIncomingHeadersToOutgoing("x-correlation-id", "x-source-system");
});
```

When a handler receives a message carrying any of the named headers, Wolverine will copy those headers onto every outgoing message cascaded within that handler context. Headers not present on the incoming message are silently skipped.

This works across all transports — Kafka, RabbitMQ, Azure Service Bus, or any other. The headers must be present on the incoming `Envelope` at the point the handler runs. Wolverine's default envelope mappers only carry Wolverine's own metadata headers, so if you need to propagate custom headers from an external producer you will need a custom envelope mapper that explicitly reads those headers from the transport message and sets them on the envelope.
70 changes: 70 additions & 0 deletions src/Testing/CoreTests/PropagateHeadersRuleTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using NSubstitute;
using Wolverine.ComplianceTests;
using Xunit;

namespace CoreTests;

public class PropagateHeadersRuleTests
{
[Fact]
public void modify_is_a_no_op_outside_of_a_handler_context()
{
var rule = new PropagateHeadersRule(["x-custom"]);
var envelope = ObjectMother.Envelope();

rule.Modify(envelope);

envelope.Headers.ContainsKey("x-custom").ShouldBeFalse();
}

[Fact]
public void copies_named_headers_from_incoming_to_outgoing()
{
var rule = new PropagateHeadersRule(["x-custom", "x-other"]);

var incoming = ObjectMother.Envelope();
incoming.Headers["x-custom"] = "custom-value";
incoming.Headers["x-other"] = "other-value";

var context = Substitute.For<IMessageContext>();
context.Envelope.Returns(incoming);

var outgoing = ObjectMother.Envelope();
rule.ApplyCorrelation(context, outgoing);

outgoing.Headers["x-custom"].ShouldBe("custom-value");
outgoing.Headers["x-other"].ShouldBe("other-value");
}

[Fact]
public void headers_not_present_on_incoming_are_silently_skipped()
{
var rule = new PropagateHeadersRule(["x-present", "x-missing"]);

var incoming = ObjectMother.Envelope();
incoming.Headers["x-present"] = "value";

var context = Substitute.For<IMessageContext>();
context.Envelope.Returns(incoming);

var outgoing = ObjectMother.Envelope();
rule.ApplyCorrelation(context, outgoing);

outgoing.Headers["x-present"].ShouldBe("value");
outgoing.Headers.ContainsKey("x-missing").ShouldBeFalse();
}

[Fact]
public void no_op_when_there_is_no_incoming_envelope()
{
var rule = new PropagateHeadersRule(["x-custom"]);

var context = Substitute.For<IMessageContext>();
context.Envelope.Returns((Envelope?)null);

var outgoing = ObjectMother.Envelope();
rule.ApplyCorrelation(context, outgoing);

outgoing.Headers.ContainsKey("x-custom").ShouldBeFalse();
}
}
27 changes: 27 additions & 0 deletions src/Wolverine/IEnvelopeRule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,33 @@ public void ApplyCorrelation(IMessageContext originator, Envelope outgoing)
}
}

internal class PropagateHeadersRule : IEnvelopeRule
{
private readonly string[] _headerNames;

public PropagateHeadersRule(string[] headerNames)
{
_headerNames = headerNames;
}

// No incoming context available outside a handler — nothing to propagate
public void Modify(Envelope envelope) { }

public void ApplyCorrelation(IMessageContext originator, Envelope outgoing)
{
var incoming = originator.Envelope;
if (incoming is null) return;

foreach (var name in _headerNames)
{
if (incoming.Headers.TryGetValue(name, out var value))
{
outgoing.Headers[name] = value;
}
}
}
}

internal class LambdaEnvelopeRule : IEnvelopeRule
{
private readonly Action<Envelope> _configure;
Expand Down
7 changes: 7 additions & 0 deletions src/Wolverine/IPolicies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,11 @@ public interface IPolicies : IEnumerable<IWolverinePolicy>, IWithFailurePolicies
/// specifying DeliveryOptions on every outgoing message.
/// </summary>
void PropagateGroupIdToPartitionKey();

/// <summary>
/// Automatically propagate the named headers from an incoming message to all outgoing
/// messages cascaded within the same handler context. Headers not present on the incoming
/// message are silently skipped.
/// </summary>
void PropagateIncomingHeadersToOutgoing(params string[] headerNames);
}
8 changes: 8 additions & 0 deletions src/Wolverine/WolverineOptions.Policies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,14 @@ void IPolicies.PropagateGroupIdToPartitionKey()
MetadataRules.Add(new GroupIdToPartitionKeyRule());
}

void IPolicies.PropagateIncomingHeadersToOutgoing(params string[] headerNames)
{
if (headerNames == null || headerNames.Length == 0)
throw new ArgumentException("At least one header name is required", nameof(headerNames));

MetadataRules.Add(new PropagateHeadersRule(headerNames));
}

internal MiddlewarePolicy FindOrCreateMiddlewarePolicy()
{
var policy = RegisteredPolicies.OfType<MiddlewarePolicy>().FirstOrDefault();
Expand Down
Loading