diff --git a/Directory.Packages.props b/Directory.Packages.props index 3896476ad..63dc13cc2 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -29,15 +29,15 @@ - + - + - + @@ -100,13 +100,13 @@ - - - - - - - + + + + + + + diff --git a/docs/guide/durability/marten/event-forwarding.md b/docs/guide/durability/marten/event-forwarding.md index 3c4d63cd9..d30115df0 100644 --- a/docs/guide/durability/marten/event-forwarding.md +++ b/docs/guide/durability/marten/event-forwarding.md @@ -180,3 +180,53 @@ public static Task HandleAsync(SecondMessage message, IDocumentSession session) ``` snippet source | anchor + +## Overriding Side-Effect Message Metadata + +When a Marten projection publishes a Wolverine message from `RaiseSideEffects` via `slice.PublishMessage(...)`, the resulting Wolverine envelope is built by an internal `MessageContext` that has no inherent knowledge of the originating event's metadata. By default the outgoing message ends up with no correlation id, no causation id, and an envelope-level conversation id rooted at its own envelope id — which means the chain Event A → side-effect command → Event B does not naturally share a correlation id. + +The metadata-aware overload of `PublishMessage` (JasperFx.Events 1.29+) lets the projection author override the per-message metadata that the side-effect command's envelope (and the Marten session opened for its handler) will inherit: + +```csharp +public class TodoCloserProjection : MultiStreamProjection +{ + public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice slice) + { + if (slice.Snapshot is null) return ValueTask.CompletedTask; + + // Carry the originating event's correlation id (and optionally its + // causation id) onto the command we're emitting, so the handler that + // closes the task can match against it. + var correlationId = slice.Events() + .Select(e => e.CorrelationId) + .FirstOrDefault(id => id is not null); + + slice.PublishMessage( + new CloseTodoTask(slice.Snapshot.Id), + new MessageMetadata(slice.TenantId) + { + CorrelationId = correlationId, + CausationId = slice.Events().Last().Id.ToString() + }); + + return ValueTask.CompletedTask; + } +} +``` + +What the override actually does inside Wolverine: + +| `MessageMetadata` field | Effect on the outgoing envelope | Effect on the receiving handler | +|---|---|---| +| `TenantId` | `envelope.TenantId` (also drives transport routing for tenanted endpoints) | `IMessageContext.TenantId`, scoped `DbContext`/`IDocumentSession` tenant | +| `CorrelationId` | `envelope.CorrelationId` (passes through `MessageBus.TrackEnvelopeCorrelation` without being clobbered) | `IMessageContext.CorrelationId`, `session.CorrelationId` (Marten) | +| `CausationId` | Stored as `envelope.Headers["causation-id"]` because Wolverine's native `Envelope.ConversationId` is `Guid`-typed and would lose information | `session.CausationId` (Marten) — `OutboxedSessionFactory` reads the header in preference to the default `ConversationId.ToString()` chain | +| `Headers` | Each entry copied onto `envelope.Headers` (string-converted) | Available via `envelope.Headers` on the receiving side | + +The metadata-less form (`slice.PublishMessage(message)`) is unchanged and remains the right call when you don't need per-message overrides. + +### When you actually need this + +The motivating case is a "todo-list" projection: Event A opens a task keyed by some correlation id, Event B (emitted by the handler of a side-effect command) closes the task with the same key. Without the metadata override, Event B carries a null correlation id and the close never matches. + +The same shape applies to anywhere you want a chain of derived events/commands to share a business-meaningful identifier — distributed tracing keyed on `correlation-id`, idempotency keys, tenant-scoped audit threading, etc. Pick the metadata fields that match your business need; you don't have to set all of them. diff --git a/src/Persistence/MartenTests/Bugs/Bug_2545_raise_side_effects_with_metadata_override.cs b/src/Persistence/MartenTests/Bugs/Bug_2545_raise_side_effects_with_metadata_override.cs new file mode 100644 index 000000000..7898285ee --- /dev/null +++ b/src/Persistence/MartenTests/Bugs/Bug_2545_raise_side_effects_with_metadata_override.cs @@ -0,0 +1,141 @@ +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Grouping; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events.Aggregation; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.Bugs; + +/// +/// GH-2545: when a Marten projection publishes a side-effect message via +/// , the +/// resulting Wolverine envelope — and the Marten IDocumentSession that +/// the handler opens for that envelope — must inherit the user-supplied +/// and +/// . This is the "todo-list" pattern +/// the issue describes: Event A opens a task keyed by a correlation id, +/// Event B (emitted by the handler of a side-effect command) closes the +/// task with the same correlation id. +/// +public class Bug_2545_raise_side_effects_with_metadata_override +{ + [Fact] + public async Task metadata_on_PublishMessage_flows_to_handler_context_and_marten_session() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug_2545"; + m.Projections.Add(ProjectionLifecycle.Async); + m.DisableNpgsqlLogging = true; + }) + .IntegrateWithWolverine() + .AddAsyncDaemon(DaemonMode.Solo); + + opts.Policies.UseDurableLocalQueues(); + + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(typeof(Bug2545SideEffectHandler)); + }).StartAsync(); + + var streamId = Guid.NewGuid(); + Bug2545SideEffectHandler.Received.Clear(); + + // Clear any prior state from earlier test runs. + var store = host.Services.GetRequiredService(); + await store.Advanced.Clean.CompletelyRemoveAllAsync(); + + var tracked = await host + .TrackActivity() + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(host) + .ExecuteAndWaitAsync((Func)(async _ => + { + await using var session = store.LightweightSession(); + session.Events.StartStream(streamId, new Bug2545Triggered()); + await session.SaveChangesAsync(); + })); + + tracked.Executed.SingleMessage().ShouldNotBeNull(); + + // 1) The handler observed the user-supplied correlation id. + Bug2545SideEffectHandler.ObservedCorrelationId.ShouldBe(Bug2545Projection.CorrelationId); + + // 2) The handler's MessageContext rolled the causation override from + // the envelope header onto the IDocumentSession, so any events the + // handler had appended would carry CausationId = the user override + // rather than Wolverine's default Guid-typed ConversationId string. + Bug2545SideEffectHandler.ObservedSessionCausationId.ShouldBe(Bug2545Projection.CausationId); + + // 3) And CorrelationId on the session matches (parity with the default + // OutboxedSessionFactory behavior on the non-override path). + Bug2545SideEffectHandler.ObservedSessionCorrelationId.ShouldBe(Bug2545Projection.CorrelationId); + } +} + +public record Bug2545Triggered; + +public record Bug2545Command(Guid AggregateId); + +public class Bug2545Aggregate +{ + public Guid Id { get; set; } + + public static Bug2545Aggregate Create(Bug2545Triggered _) => new(); +} + +public class Bug2545Projection : SingleStreamProjection +{ + public const string CorrelationId = "todo-correlation-42"; + public const string CausationId = "caused-by-triggered-event"; + + public static Bug2545Aggregate Create(Bug2545Triggered _) => new(); + + public override ValueTask RaiseSideEffects( + Marten.IDocumentOperations operations, + IEventSlice slice) + { + if (slice.Snapshot is null) return ValueTask.CompletedTask; + + slice.PublishMessage( + new Bug2545Command(slice.Snapshot.Id), + new MessageMetadata(slice.TenantId) + { + CorrelationId = CorrelationId, + CausationId = CausationId + }); + + return ValueTask.CompletedTask; + } +} + +public static class Bug2545SideEffectHandler +{ + public static readonly List Received = new(); + public static string? ObservedCorrelationId; + public static string? ObservedSessionCausationId; + public static string? ObservedSessionCorrelationId; + + public static void Handle(Bug2545Command cmd, IMessageContext context, IDocumentSession session) + { + Received.Add(cmd); + ObservedCorrelationId = context.CorrelationId; + ObservedSessionCausationId = session.CausationId; + ObservedSessionCorrelationId = session.CorrelationId; + } +} diff --git a/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs index e9ab96737..9d511b242 100644 --- a/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs +++ b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineMessageBatch.cs @@ -1,3 +1,4 @@ +using JasperFx.Events; using Marten; using Marten.Events.Aggregation; using Marten.Internal.Sessions; @@ -15,6 +16,41 @@ public ValueTask PublishAsync(T message, string tenantId) return Context.PublishAsync(message, new DeliveryOptions { TenantId = tenantId }); } + /// + /// Metadata-aware overload backing + /// (JasperFx.Events 1.29+). Maps the incoming + /// onto a so projection-authored side-effect + /// messages can override tenant, correlation id, causation id, and headers + /// on a per-message basis. See https://github.com/JasperFx/wolverine/issues/2545. + /// + public ValueTask PublishAsync(T message, MessageMetadata metadata) + { + var options = new DeliveryOptions + { + TenantId = metadata.TenantId + }; + + if (metadata.CorrelationIdEnabled) + { + options.CorrelationId = metadata.CorrelationId; + } + + if (metadata.CausationIdEnabled) + { + options.CausationId = metadata.CausationId; + } + + if (metadata.HeadersEnabled) + { + foreach (var header in metadata.Headers!) + { + options.Headers[header.Key] = header.Value?.ToString(); + } + } + + return Context.PublishAsync(message, options); + } + public Task AfterCommitAsync(IDocumentSession session, IChangeSet commit, CancellationToken token) { return Context.FlushOutgoingMessagesAsync(); diff --git a/src/Persistence/Wolverine.Marten/Publishing/OutboxedSessionFactory.cs b/src/Persistence/Wolverine.Marten/Publishing/OutboxedSessionFactory.cs index e6b6537bd..6577d0b12 100644 --- a/src/Persistence/Wolverine.Marten/Publishing/OutboxedSessionFactory.cs +++ b/src/Persistence/Wolverine.Marten/Publishing/OutboxedSessionFactory.cs @@ -137,8 +137,20 @@ public IDocumentSession OpenSession(MessageContext context, string? tenantId) private void configureSession(MessageContext context, IDocumentSession session) { context.OverrideStorage(MessageStore); - - if (context.ConversationId != Guid.Empty) + + // Per-message CausationId override supplied via + // DeliveryOptions.CausationId (envelope header "causation-id") takes + // precedence over the default Wolverine ConversationId-based causation + // chain. This is how a projection that calls + // slice.PublishMessage(cmd, metadata with CausationId = ...) gets the + // overridden id onto the events the command's handler writes. + if (context.Envelope is { } env + && env.Headers.TryGetValue(EnvelopeConstants.CausationIdKey, out var headerCausationId) + && !string.IsNullOrEmpty(headerCausationId)) + { + session.CausationId = headerCausationId; + } + else if (context.ConversationId != Guid.Empty) { session.CausationId = context.ConversationId.ToString(); } diff --git a/src/Wolverine/DeliveryOptions.cs b/src/Wolverine/DeliveryOptions.cs index 2fcaef0a8..38713509b 100644 --- a/src/Wolverine/DeliveryOptions.cs +++ b/src/Wolverine/DeliveryOptions.cs @@ -68,6 +68,18 @@ public TimeSpan DeliverWithin /// public string? TenantId { get; set; } + /// + /// Override the correlation id stamped on the outgoing envelope. Defaults to + /// the sending 's correlation id when not set. + /// + public string? CorrelationId { get; set; } + + /// + /// Override the causation id stamped on the outgoing envelope. Defaults to + /// the sending 's conversation id when not set. + /// + public string? CausationId { get; set; } + /// /// Mimetype of the serialized data /// @@ -126,6 +138,22 @@ internal void Override(Envelope envelope) envelope.TenantId = TenantId; } + if (CorrelationId.IsNotEmpty()) + { + envelope.CorrelationId = CorrelationId; + } + + if (CausationId.IsNotEmpty()) + { + // Wolverine's native causation chain is Envelope.ConversationId (Guid), but + // application-meaningful CausationId values are arbitrary strings (matching + // JasperFx.IMetadataContext). Stash it as a well-known header so downstream + // consumers — most notably Wolverine.Marten's OutboxedSessionFactory when it + // configures session.CausationId — can surface it without collapsing the + // Guid-typed ConversationId in the process. + envelope.Headers[EnvelopeConstants.CausationIdKey] = CausationId; + } + if (ContentType != null) { envelope.ContentType = ContentType; diff --git a/src/Wolverine/EnvelopeConstants.cs b/src/Wolverine/EnvelopeConstants.cs index 30e1de845..f93036f06 100644 --- a/src/Wolverine/EnvelopeConstants.cs +++ b/src/Wolverine/EnvelopeConstants.cs @@ -27,4 +27,5 @@ public static class EnvelopeConstants public const string TopicNameKey = "topic-name"; public const string KeepUntilKey = "keep-until"; public const string UserNameKey = "user-name"; + public const string CausationIdKey = "causation-id"; } \ No newline at end of file diff --git a/src/Wolverine/Runtime/MessageBus.cs b/src/Wolverine/Runtime/MessageBus.cs index ab59278ea..8ed34db08 100644 --- a/src/Wolverine/Runtime/MessageBus.cs +++ b/src/Wolverine/Runtime/MessageBus.cs @@ -348,7 +348,13 @@ private void trackEnvelopeCorrelation(Activity? activity, Envelope[] outgoing) internal virtual void TrackEnvelopeCorrelation(Envelope outbound, Activity? activity) { outbound.Source = Runtime.Options.ServiceName; - outbound.CorrelationId = CorrelationId; + // DeliveryOptions.Override may have already stamped a per-message + // CorrelationId (e.g. from a Marten projection's RaiseSideEffects + // call passing MessageMetadata) — don't clobber it. See GH-2545. + if (outbound.CorrelationId.IsEmpty()) + { + outbound.CorrelationId = CorrelationId; + } outbound.ConversationId = outbound.Id; // the message chain originates here outbound.TenantId ??= TenantId; // don't override a tenant id that's specifically set on the envelope itself