diff --git a/src/Testing/CoreTests/Runtime/Scheduled/inner_envelope_is_stamped_before_serialization.cs b/src/Testing/CoreTests/Runtime/Scheduled/inner_envelope_is_stamped_before_serialization.cs new file mode 100644 index 000000000..4d6664a8e --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Scheduled/inner_envelope_is_stamped_before_serialization.cs @@ -0,0 +1,83 @@ +using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.ComplianceTests.Compliance; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Wolverine.Transports.SharedMemory; +using Xunit; + +namespace CoreTests.Runtime.Scheduled; + +public class inner_envelope_is_stamped_before_serialization : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + ScheduledEnvelopeCapture.Reset(); + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.EnableRelayOfUserName = true; + opts.PublishAllMessages().ToSharedMemoryTopic("scheduled_tenant_topic"); + opts.ListenToSharedMemorySubscription("scheduled_tenant_topic", "sub").ProcessInline(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task scheduled_send_to_non_native_transport_preserves_context_fields() + { + var bus = (MessageBus)_host.Services.GetRequiredService(); + bus.TenantId = "red"; + bus.CorrelationId = "corr-123"; + bus.UserName = "alice"; + + var tracked = await _host.TrackActivity() + .Timeout(10.Seconds()) + .ExecuteAndWaitAsync(_ => + bus.PublishAsync(new Message1(), new DeliveryOptions { ScheduleDelay = 1.Hours() }).AsTask()); + + await tracked.PlayScheduledMessagesAsync(2.Hours()); + + var captured = await ScheduledEnvelopeCapture.WaitAsync(5.Seconds()); + captured.TenantId.ShouldBe("red"); + captured.CorrelationId.ShouldBe("corr-123"); + captured.UserName.ShouldBe("alice"); + } +} + +public class Message1CapturingHandler +{ + public void Handle(Message1 _, Envelope envelope) + { + ScheduledEnvelopeCapture.Capture(envelope); + } +} + +public static class ScheduledEnvelopeCapture +{ + public record Snapshot(string? TenantId, string? CorrelationId, string? UserName); + + private static TaskCompletionSource _tcs = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + public static void Reset() + { + _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public static void Capture(Envelope envelope) + { + _tcs.TrySetResult(new Snapshot(envelope.TenantId, envelope.CorrelationId, envelope.UserName)); + } + + public static Task WaitAsync(TimeSpan timeout) => _tcs.Task.WaitAsync(timeout); +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/scheduled_saga_timeout_preserves_tenant.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/scheduled_saga_timeout_preserves_tenant.cs new file mode 100644 index 000000000..e899aeaa2 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/scheduled_saga_timeout_preserves_tenant.cs @@ -0,0 +1,158 @@ +using IntegrationTests; +using JasperFx.Core; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using JasperFx; +using JasperFx.Resources; +using Shouldly; +using Wolverine.Marten; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests; + +public class scheduled_saga_timeout_preserves_tenant : IAsyncLifetime +{ + private IHost _host = null!; + private string _queueName = null!; + + public async Task InitializeAsync() + { + SagaTimeoutCapture.Reset(); + + _queueName = RabbitTesting.NextQueueName(); + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "scheduled_saga_tenant"; + m.Policies.AllDocumentsAreMultiTenanted(); + m.AutoCreateSchemaObjects = AutoCreate.All; + m.DisableNpgsqlLogging = true; + }) + .IntegrateWithWolverine() + .UseLightweightSessions(); + + opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup(); + + opts.ListenToRabbitQueue(_queueName); + opts.PublishMessage().ToRabbitQueue(_queueName); + + opts.Policies.UseDurableInboxOnAllListeners(); + opts.Policies.UseDurableOutboxOnAllSendingEndpoints(); + opts.Policies.AutoApplyTransactions(); + + opts.Durability.ScheduledJobFirstExecution = 500.Milliseconds(); + opts.Durability.ScheduledJobPollingTime = 1.Seconds(); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task saga_timeout_delivered_through_rabbit_is_handled_under_original_tenant() + { + var sagaId = Guid.NewGuid(); + + await _host.MessageBus().InvokeForTenantAsync("red", new StartSaga(sagaId)); + + SagaTimeoutCapture.Snapshot captured; + try + { + captured = await SagaTimeoutCapture.WaitAsync(30.Seconds()); + } + catch (TimeoutException) + { + var store = _host.Services.GetRequiredService(); + var snapshots = new List(); + foreach (var tenant in new[] { "red", "*DEFAULT*" }) + { + await using var diag = store.QuerySession(tenant); + var row = await diag.LoadAsync(sagaId); + if (row != null) + { + snapshots.Add($"tenant={tenant}, storedTenant={row.StoredTenantId ?? "null"}, timedOut={row.TimedOut}"); + } + } + throw new ShouldAssertException( + $"The scheduled saga timeout never reached the saga handler within 30s — " + + $"the TimeoutMessage was silently dropped because the saga lookup missed. " + + $"Saga rows for id {sagaId}: " + + (snapshots.Count > 0 ? string.Join(" | ", snapshots) : "none")); + } + + captured.WasFound.ShouldBeTrue(); + captured.EnvelopeTenantId.ShouldBe("red"); + captured.EnvelopeSagaId.ShouldBe(sagaId.ToString()); + captured.ContextTenantId.ShouldBe("red"); + + // After Handle ran and called MarkCompleted, Marten removes the saga row. + var store2 = _host.Services.GetRequiredService(); + await using var session = store2.QuerySession(); + var remaining = await session.Query() + .Where(x => x.Id == sagaId) + .ToListAsync(); + + remaining.ShouldBeEmpty(); + } +} + +public record StartSaga(Guid SagaId); + +public record SagaTimeoutMsg(Guid SagaId) : TimeoutMessage(2.Seconds()); + +public class TenantedRabbitSaga : Saga +{ + public Guid Id { get; set; } + public string? StoredTenantId { get; set; } + public bool TimedOut { get; set; } + + public static (TenantedRabbitSaga, SagaTimeoutMsg) Start(StartSaga cmd, Envelope envelope) + { + return ( + new TenantedRabbitSaga { Id = cmd.SagaId, StoredTenantId = envelope.TenantId }, + new SagaTimeoutMsg(cmd.SagaId) + ); + } + + public void Handle(SagaTimeoutMsg timeout, Envelope envelope, IMessageContext context) + { + TimedOut = true; + SagaTimeoutCapture.Capture(envelope, context, wasFound: true); + MarkCompleted(); + } +} + +public static class SagaTimeoutCapture +{ + public record Snapshot( + string? EnvelopeTenantId, + string? EnvelopeSagaId, + string? ContextTenantId, + bool WasFound); + + private static TaskCompletionSource _tcs = + new(TaskCreationOptions.RunContinuationsAsynchronously); + + public static void Reset() + { + _tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + } + + public static void Capture(Envelope envelope, IMessageContext context, bool wasFound) + { + _tcs.TrySetResult(new Snapshot(envelope.TenantId, envelope.SagaId, context.TenantId, wasFound)); + } + + public static Task WaitAsync(TimeSpan timeout) => _tcs.Task.WaitAsync(timeout); +} diff --git a/src/Wolverine/Runtime/MessageBus.cs b/src/Wolverine/Runtime/MessageBus.cs index 8ed34db08..cfff4a76e 100644 --- a/src/Wolverine/Runtime/MessageBus.cs +++ b/src/Wolverine/Runtime/MessageBus.cs @@ -230,7 +230,7 @@ public ValueTask SendAsync(T message, DeliveryOptions? options = null) { throw new ArgumentNullException(nameof(message)); } - + // Check for both so you don't get an infinite loop // from TimeoutMessage if (options == null && message is ISendMyself m) @@ -347,24 +347,35 @@ private void trackEnvelopeCorrelation(Activity? activity, Envelope[] outgoing) internal virtual void TrackEnvelopeCorrelation(Envelope outbound, Activity? activity) { - outbound.Source = Runtime.Options.ServiceName; + StampEnvelope(outbound); + outbound.ConversationId = outbound.Id; // the message chain originates here + outbound.ParentId = activity?.Id; + outbound.Store = Storage; + + // For scheduled wraps the transport serializes the inner envelope, so the + // context fields stamped above also have to land on the inner. + if (outbound is { MessageType: TransportConstants.ScheduledEnvelope, Message: Envelope inner }) + { + StampEnvelope(inner); + } + } + + internal virtual void StampEnvelope(Envelope envelope) + { + envelope.Source = Runtime.Options.ServiceName; // DeliveryOptions.Override may have already stamped a per-message // CorrelationId (e.g. from a Marten projection's RaiseSideEffects // call passing MessageMetadata) — don't clobber it. See GH-2545. - if (outbound.CorrelationId.IsEmpty()) + if (envelope.CorrelationId.IsEmpty()) { - outbound.CorrelationId = CorrelationId; + envelope.CorrelationId = CorrelationId; } - outbound.ConversationId = outbound.Id; // the message chain originates here - outbound.TenantId ??= TenantId; // don't override a tenant id that's specifically set on the envelope itself + envelope.TenantId ??= TenantId; if (Runtime.Options.EnableRelayOfUserName) { - outbound.UserName ??= UserName; + envelope.UserName ??= UserName; } - - outbound.ParentId = activity?.Id; - outbound.Store = Storage; } internal async ValueTask PersistOrSendAsync(params Envelope[] outgoing) diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 0b84759b2..8bfba333d 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -794,7 +794,6 @@ private async Task flushScheduledMessagesAsync() internal override void TrackEnvelopeCorrelation(Envelope outbound, Activity? activity) { base.TrackEnvelopeCorrelation(outbound, activity); - outbound.SagaId = _sagaId?.ToString() ?? Envelope?.SagaId ?? outbound.SagaId; if (ConversationId != Guid.Empty) { @@ -807,6 +806,12 @@ internal override void TrackEnvelopeCorrelation(Envelope outbound, Activity? act } } + internal override void StampEnvelope(Envelope envelope) + { + base.StampEnvelope(envelope); + envelope.SagaId = _sagaId?.ToString() ?? Envelope?.SagaId ?? envelope.SagaId; + } + public void OverrideStorage(IMessageStore messageStore) { Storage = messageStore; diff --git a/src/Wolverine/Runtime/Scheduled/EnvelopeReaderWriter.cs b/src/Wolverine/Runtime/Scheduled/EnvelopeReaderWriter.cs index 3c363eb11..dbee4a566 100644 --- a/src/Wolverine/Runtime/Scheduled/EnvelopeReaderWriter.cs +++ b/src/Wolverine/Runtime/Scheduled/EnvelopeReaderWriter.cs @@ -1,4 +1,3 @@ -using JasperFx.Core.Reflection; using Wolverine.Runtime.Serialization; using Wolverine.Transports; @@ -33,6 +32,12 @@ public byte[] WriteMessage(object message) public byte[] Write(Envelope model) { - return EnvelopeSerializer.Serialize(model.As()); + if (model.Message is not Envelope inner) + { + throw new InvalidOperationException( + $"{nameof(EnvelopeReaderWriter)} can only serialize a scheduled-wrap envelope whose Message is the inner Envelope, but got {model.Message?.GetType().FullName ?? "null"}."); + } + + return EnvelopeSerializer.Serialize(inner); } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/Scheduled/EnvelopeScheduleExtensions.cs b/src/Wolverine/Runtime/Scheduled/EnvelopeScheduleExtensions.cs index ada4f2415..e6558b2ab 100644 --- a/src/Wolverine/Runtime/Scheduled/EnvelopeScheduleExtensions.cs +++ b/src/Wolverine/Runtime/Scheduled/EnvelopeScheduleExtensions.cs @@ -1,4 +1,3 @@ -using Wolverine.Runtime.Serialization; using Wolverine.Transports; using Wolverine.Transports.Sending; @@ -18,7 +17,6 @@ public static Envelope ForScheduledSend(this Envelope envelope, ISendingAgent? s Status = EnvelopeStatus.Scheduled, OwnerId = TransportConstants.AnyNode, Sender = sender, - Data = EnvelopeSerializer.Serialize(envelope), TopicName = envelope.TopicName }; }