Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
<PackageVersion Include="Grpc.Tools" Version="2.76.0" />
<PackageVersion Include="HtmlTags" Version="9.0.0" />
<PackageVersion Include="JasperFx" Version="1.26.0" />
<PackageVersion Include="JasperFx.Events" Version="1.28.1" />
<PackageVersion Include="JasperFx.Events" Version="1.29.0" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.4.0" />
<PackageVersion Include="JasperFx.SourceGeneration" Version="1.1.0" />
<PackageVersion Include="Lamar.Microsoft.DependencyInjection" Version="15.0.1" />
<PackageVersion Include="Marten" Version="8.31.0" />
<PackageVersion Include="Marten" Version="8.32.0" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.1.3" />
<PackageVersion Include="Polecat" Version="1.6.1" />
<PackageVersion Include="Microsoft.Azure.Cosmos" Version="3.46.1" />
<PackageVersion Include="Marten.AspNetCore" Version="8.31.0" />
<PackageVersion Include="Marten.AspNetCore" Version="8.32.0" />
<PackageVersion Include="MemoryPack" Version="1.21.3" />
<PackageVersion Include="MessagePack" Version="3.1.3" />
<PackageVersion Include="Meziantou.Extensions.Logging.Xunit" Version="1.0.15" />
Expand Down Expand Up @@ -100,13 +100,13 @@
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.5" />
<PackageVersion Include="System.Net.NameResolution" Version="4.3.0" />
<PackageVersion Include="System.Threading.Tasks.Dataflow" Version="9.0.5" />
<PackageVersion Include="Weasel.Core" Version="8.14.0" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.14.0" />
<PackageVersion Include="Weasel.MySql" Version="8.14.0" />
<PackageVersion Include="Weasel.Oracle" Version="8.14.0" />
<PackageVersion Include="Weasel.Postgresql" Version="8.14.0" />
<PackageVersion Include="Weasel.SqlServer" Version="8.14.0" />
<PackageVersion Include="Weasel.Sqlite" Version="8.14.0" />
<PackageVersion Include="Weasel.Core" Version="8.14.1" />
<PackageVersion Include="Weasel.EntityFrameworkCore" Version="8.14.1" />
<PackageVersion Include="Weasel.MySql" Version="8.14.1" />
<PackageVersion Include="Weasel.Oracle" Version="8.14.1" />
<PackageVersion Include="Weasel.Postgresql" Version="8.14.1" />
<PackageVersion Include="Weasel.SqlServer" Version="8.14.1" />
<PackageVersion Include="Weasel.Sqlite" Version="8.14.1" />
<PackageVersion Include="xunit" Version="2.9.3" />
<PackageVersion Include="xunit.assemblyfixture" Version="2.2.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
Expand Down
50 changes: 50 additions & 0 deletions docs/guide/durability/marten/event-forwarding.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,53 @@ public static Task HandleAsync(SecondMessage message, IDocumentSession session)
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/MartenTests/event_streaming.cs#L219-L225' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_execution_of_forwarded_events_second_message_to_fourth_event' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Overriding Side-Effect Message Metadata <Badge type="tip" text="5.x" />

When a Marten projection publishes a Wolverine message from `RaiseSideEffects` via `slice.PublishMessage(...)`, the resulting Wolverine envelope is built by an internal `MessageContext` that has no inherent knowledge of the originating event's metadata. By default the outgoing message ends up with no correlation id, no causation id, and an envelope-level conversation id rooted at its own envelope id — which means the chain Event A → side-effect command → Event B does not naturally share a correlation id.

The metadata-aware overload of `PublishMessage` (JasperFx.Events 1.29+) lets the projection author override the per-message metadata that the side-effect command's envelope (and the Marten session opened for its handler) will inherit:

```csharp
public class TodoCloserProjection : MultiStreamProjection<TodoTask, Guid>
{
public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<TodoTask> slice)
{
if (slice.Snapshot is null) return ValueTask.CompletedTask;

// Carry the originating event's correlation id (and optionally its
// causation id) onto the command we're emitting, so the handler that
// closes the task can match against it.
var correlationId = slice.Events()
.Select(e => e.CorrelationId)
.FirstOrDefault(id => id is not null);

slice.PublishMessage(
new CloseTodoTask(slice.Snapshot.Id),
new MessageMetadata(slice.TenantId)
{
CorrelationId = correlationId,
CausationId = slice.Events().Last().Id.ToString()
});

return ValueTask.CompletedTask;
}
}
```

What the override actually does inside Wolverine:

| `MessageMetadata` field | Effect on the outgoing envelope | Effect on the receiving handler |
|---|---|---|
| `TenantId` | `envelope.TenantId` (also drives transport routing for tenanted endpoints) | `IMessageContext.TenantId`, scoped `DbContext`/`IDocumentSession` tenant |
| `CorrelationId` | `envelope.CorrelationId` (passes through `MessageBus.TrackEnvelopeCorrelation` without being clobbered) | `IMessageContext.CorrelationId`, `session.CorrelationId` (Marten) |
| `CausationId` | Stored as `envelope.Headers["causation-id"]` because Wolverine's native `Envelope.ConversationId` is `Guid`-typed and would lose information | `session.CausationId` (Marten) — `OutboxedSessionFactory` reads the header in preference to the default `ConversationId.ToString()` chain |
| `Headers` | Each entry copied onto `envelope.Headers` (string-converted) | Available via `envelope.Headers` on the receiving side |

The metadata-less form (`slice.PublishMessage(message)`) is unchanged and remains the right call when you don't need per-message overrides.

### When you actually need this

The motivating case is a "todo-list" projection: Event A opens a task keyed by some correlation id, Event B (emitted by the handler of a side-effect command) closes the task with the same key. Without the metadata override, Event B carries a null correlation id and the close never matches.

The same shape applies to anywhere you want a chain of derived events/commands to share a business-meaningful identifier — distributed tracing keyed on `correlation-id`, idempotency keys, tenant-scoped audit threading, etc. Pick the metadata fields that match your business need; you don't have to set all of them.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Daemon;
using JasperFx.Events.Grouping;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events.Aggregation;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests.Bugs;

/// <summary>
/// GH-2545: when a Marten projection publishes a side-effect message via
/// <see cref="IEventSlice{T}.PublishMessage(object, MessageMetadata)"/>, the
/// resulting Wolverine envelope — and the Marten <c>IDocumentSession</c> that
/// the handler opens for that envelope — must inherit the user-supplied
/// <see cref="MessageMetadata.CorrelationId"/> and
/// <see cref="MessageMetadata.CausationId"/>. This is the "todo-list" pattern
/// the issue describes: Event A opens a task keyed by a correlation id,
/// Event B (emitted by the handler of a side-effect command) closes the
/// task with the same correlation id.
/// </summary>
public class Bug_2545_raise_side_effects_with_metadata_override
{
[Fact]
public async Task metadata_on_PublishMessage_flows_to_handler_context_and_marten_session()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug_2545";
m.Projections.Add<Bug2545Projection>(ProjectionLifecycle.Async);
m.DisableNpgsqlLogging = true;
})
.IntegrateWithWolverine()
.AddAsyncDaemon(DaemonMode.Solo);

opts.Policies.UseDurableLocalQueues();

opts.Discovery.DisableConventionalDiscovery()
.IncludeType(typeof(Bug2545SideEffectHandler));
}).StartAsync();

var streamId = Guid.NewGuid();
Bug2545SideEffectHandler.Received.Clear();

// Clear any prior state from earlier test runs.
var store = host.Services.GetRequiredService<IDocumentStore>();
await store.Advanced.Clean.CompletelyRemoveAllAsync();

var tracked = await host
.TrackActivity()
.Timeout(30.Seconds())
.WaitForMessageToBeReceivedAt<Bug2545Command>(host)
.ExecuteAndWaitAsync((Func<IMessageContext, Task>)(async _ =>
{
await using var session = store.LightweightSession();
session.Events.StartStream<Bug2545Aggregate>(streamId, new Bug2545Triggered());
await session.SaveChangesAsync();
}));

tracked.Executed.SingleMessage<Bug2545Command>().ShouldNotBeNull();

// 1) The handler observed the user-supplied correlation id.
Bug2545SideEffectHandler.ObservedCorrelationId.ShouldBe(Bug2545Projection.CorrelationId);

// 2) The handler's MessageContext rolled the causation override from
// the envelope header onto the IDocumentSession, so any events the
// handler had appended would carry CausationId = the user override
// rather than Wolverine's default Guid-typed ConversationId string.
Bug2545SideEffectHandler.ObservedSessionCausationId.ShouldBe(Bug2545Projection.CausationId);

// 3) And CorrelationId on the session matches (parity with the default
// OutboxedSessionFactory behavior on the non-override path).
Bug2545SideEffectHandler.ObservedSessionCorrelationId.ShouldBe(Bug2545Projection.CorrelationId);
}
}

public record Bug2545Triggered;

public record Bug2545Command(Guid AggregateId);

public class Bug2545Aggregate
{
public Guid Id { get; set; }

public static Bug2545Aggregate Create(Bug2545Triggered _) => new();
}

public class Bug2545Projection : SingleStreamProjection<Bug2545Aggregate, Guid>
{
public const string CorrelationId = "todo-correlation-42";
public const string CausationId = "caused-by-triggered-event";

public static Bug2545Aggregate Create(Bug2545Triggered _) => new();

public override ValueTask RaiseSideEffects(
Marten.IDocumentOperations operations,
IEventSlice<Bug2545Aggregate> slice)
{
if (slice.Snapshot is null) return ValueTask.CompletedTask;

slice.PublishMessage(
new Bug2545Command(slice.Snapshot.Id),
new MessageMetadata(slice.TenantId)
{
CorrelationId = CorrelationId,
CausationId = CausationId
});

return ValueTask.CompletedTask;
}
}

public static class Bug2545SideEffectHandler
{
public static readonly List<Bug2545Command> Received = new();
public static string? ObservedCorrelationId;
public static string? ObservedSessionCausationId;
public static string? ObservedSessionCorrelationId;

public static void Handle(Bug2545Command cmd, IMessageContext context, IDocumentSession session)
{
Received.Add(cmd);
ObservedCorrelationId = context.CorrelationId;
ObservedSessionCausationId = session.CausationId;
ObservedSessionCorrelationId = session.CorrelationId;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Events;
using Marten;
using Marten.Events.Aggregation;
using Marten.Internal.Sessions;
Expand All @@ -15,6 +16,41 @@ public ValueTask PublishAsync<T>(T message, string tenantId)
return Context.PublishAsync(message, new DeliveryOptions { TenantId = tenantId });
}

/// <summary>
/// Metadata-aware overload backing <see cref="IMessageSink.PublishAsync{T}(T, MessageMetadata)"/>
/// (JasperFx.Events 1.29+). Maps the incoming <see cref="MessageMetadata"/>
/// onto a <see cref="DeliveryOptions"/> so projection-authored side-effect
/// messages can override tenant, correlation id, causation id, and headers
/// on a per-message basis. See https://github.com/JasperFx/wolverine/issues/2545.
/// </summary>
public ValueTask PublishAsync<T>(T message, MessageMetadata metadata)
{
var options = new DeliveryOptions
{
TenantId = metadata.TenantId
};

if (metadata.CorrelationIdEnabled)
{
options.CorrelationId = metadata.CorrelationId;
}

if (metadata.CausationIdEnabled)
{
options.CausationId = metadata.CausationId;
}

if (metadata.HeadersEnabled)
{
foreach (var header in metadata.Headers!)
{
options.Headers[header.Key] = header.Value?.ToString();
}
}

return Context.PublishAsync(message, options);
}

public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token)
{
return Context.FlushOutgoingMessagesAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,20 @@ public IDocumentSession OpenSession(MessageContext context, string? tenantId)
private void configureSession(MessageContext context, IDocumentSession session)
{
context.OverrideStorage(MessageStore);

if (context.ConversationId != Guid.Empty)

// Per-message CausationId override supplied via
// DeliveryOptions.CausationId (envelope header "causation-id") takes
// precedence over the default Wolverine ConversationId-based causation
// chain. This is how a projection that calls
// slice.PublishMessage(cmd, metadata with CausationId = ...) gets the
// overridden id onto the events the command's handler writes.
if (context.Envelope is { } env
&& env.Headers.TryGetValue(EnvelopeConstants.CausationIdKey, out var headerCausationId)
&& !string.IsNullOrEmpty(headerCausationId))
{
session.CausationId = headerCausationId;
}
else if (context.ConversationId != Guid.Empty)
{
session.CausationId = context.ConversationId.ToString();
}
Expand Down
28 changes: 28 additions & 0 deletions src/Wolverine/DeliveryOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public TimeSpan DeliverWithin
/// </summary>
public string? TenantId { get; set; }

/// <summary>
/// Override the correlation id stamped on the outgoing envelope. Defaults to
/// the sending <see cref="IMessageContext"/>'s correlation id when not set.
/// </summary>
public string? CorrelationId { get; set; }

/// <summary>
/// Override the causation id stamped on the outgoing envelope. Defaults to
/// the sending <see cref="IMessageContext"/>'s conversation id when not set.
/// </summary>
public string? CausationId { get; set; }

/// <summary>
/// Mimetype of the serialized data
/// </summary>
Expand Down Expand Up @@ -126,6 +138,22 @@ internal void Override(Envelope envelope)
envelope.TenantId = TenantId;
}

if (CorrelationId.IsNotEmpty())
{
envelope.CorrelationId = CorrelationId;
}

if (CausationId.IsNotEmpty())
{
// Wolverine's native causation chain is Envelope.ConversationId (Guid), but
// application-meaningful CausationId values are arbitrary strings (matching
// JasperFx.IMetadataContext). Stash it as a well-known header so downstream
// consumers — most notably Wolverine.Marten's OutboxedSessionFactory when it
// configures session.CausationId — can surface it without collapsing the
// Guid-typed ConversationId in the process.
envelope.Headers[EnvelopeConstants.CausationIdKey] = CausationId;
}

if (ContentType != null)
{
envelope.ContentType = ContentType;
Expand Down
1 change: 1 addition & 0 deletions src/Wolverine/EnvelopeConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public static class EnvelopeConstants
public const string TopicNameKey = "topic-name";
public const string KeepUntilKey = "keep-until";
public const string UserNameKey = "user-name";
public const string CausationIdKey = "causation-id";
}
8 changes: 7 additions & 1 deletion src/Wolverine/Runtime/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,13 @@ private void trackEnvelopeCorrelation(Activity? activity, Envelope[] outgoing)
internal virtual void TrackEnvelopeCorrelation(Envelope outbound, Activity? activity)
{
outbound.Source = Runtime.Options.ServiceName;
outbound.CorrelationId = CorrelationId;
// DeliveryOptions.Override may have already stamped a per-message
// CorrelationId (e.g. from a Marten projection's RaiseSideEffects
// call passing MessageMetadata) — don't clobber it. See GH-2545.
if (outbound.CorrelationId.IsEmpty())
{
outbound.CorrelationId = CorrelationId;
}
outbound.ConversationId = outbound.Id; // the message chain originates here
outbound.TenantId ??= TenantId; // don't override a tenant id that's specifically set on the envelope itself

Expand Down
Loading