Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine.ComplianceTests.Compliance;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Wolverine.Transports.SharedMemory;
using Xunit;

namespace CoreTests.Runtime.Scheduled;

public class inner_envelope_is_stamped_before_serialization : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
ScheduledEnvelopeCapture.Reset();

_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.EnableRelayOfUserName = true;
opts.PublishAllMessages().ToSharedMemoryTopic("scheduled_tenant_topic");
opts.ListenToSharedMemorySubscription("scheduled_tenant_topic", "sub").ProcessInline();
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task scheduled_send_to_non_native_transport_preserves_context_fields()
{
var bus = (MessageBus)_host.Services.GetRequiredService<IMessageBus>();
bus.TenantId = "red";
bus.CorrelationId = "corr-123";
bus.UserName = "alice";

var tracked = await _host.TrackActivity()
.Timeout(10.Seconds())
.ExecuteAndWaitAsync(_ =>
bus.PublishAsync(new Message1(), new DeliveryOptions { ScheduleDelay = 1.Hours() }).AsTask());

await tracked.PlayScheduledMessagesAsync(2.Hours());

var captured = await ScheduledEnvelopeCapture.WaitAsync(5.Seconds());
captured.TenantId.ShouldBe("red");
captured.CorrelationId.ShouldBe("corr-123");
captured.UserName.ShouldBe("alice");
}
}

public class Message1CapturingHandler
{
public void Handle(Message1 _, Envelope envelope)
{
ScheduledEnvelopeCapture.Capture(envelope);
}
}

public static class ScheduledEnvelopeCapture
{
public record Snapshot(string? TenantId, string? CorrelationId, string? UserName);

private static TaskCompletionSource<Snapshot> _tcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);

public static void Reset()
{
_tcs = new TaskCompletionSource<Snapshot>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public static void Capture(Envelope envelope)
{
_tcs.TrySetResult(new Snapshot(envelope.TenantId, envelope.CorrelationId, envelope.UserName));
}

public static Task<Snapshot> WaitAsync(TimeSpan timeout) => _tcs.Task.WaitAsync(timeout);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
using IntegrationTests;
using JasperFx.Core;
using Marten;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using JasperFx;
using JasperFx.Resources;
using Shouldly;
using Wolverine.Marten;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.RabbitMQ.Tests;

public class scheduled_saga_timeout_preserves_tenant : IAsyncLifetime
{
private IHost _host = null!;
private string _queueName = null!;

public async Task InitializeAsync()
{
SagaTimeoutCapture.Reset();

_queueName = RabbitTesting.NextQueueName();

_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "scheduled_saga_tenant";
m.Policies.AllDocumentsAreMultiTenanted();
m.AutoCreateSchemaObjects = AutoCreate.All;
m.DisableNpgsqlLogging = true;
})
.IntegrateWithWolverine()
.UseLightweightSessions();

opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup();

opts.ListenToRabbitQueue(_queueName);
opts.PublishMessage<SagaTimeoutMsg>().ToRabbitQueue(_queueName);

opts.Policies.UseDurableInboxOnAllListeners();
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
opts.Policies.AutoApplyTransactions();

opts.Durability.ScheduledJobFirstExecution = 500.Milliseconds();
opts.Durability.ScheduledJobPollingTime = 1.Seconds();

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task saga_timeout_delivered_through_rabbit_is_handled_under_original_tenant()
{
var sagaId = Guid.NewGuid();

await _host.MessageBus().InvokeForTenantAsync("red", new StartSaga(sagaId));

SagaTimeoutCapture.Snapshot captured;
try
{
captured = await SagaTimeoutCapture.WaitAsync(30.Seconds());
}
catch (TimeoutException)
{
var store = _host.Services.GetRequiredService<IDocumentStore>();
var snapshots = new List<string>();
foreach (var tenant in new[] { "red", "*DEFAULT*" })
{
await using var diag = store.QuerySession(tenant);
var row = await diag.LoadAsync<TenantedRabbitSaga>(sagaId);
if (row != null)
{
snapshots.Add($"tenant={tenant}, storedTenant={row.StoredTenantId ?? "null"}, timedOut={row.TimedOut}");
}
}
throw new ShouldAssertException(
$"The scheduled saga timeout never reached the saga handler within 30s — " +
$"the TimeoutMessage was silently dropped because the saga lookup missed. " +
$"Saga rows for id {sagaId}: " +
(snapshots.Count > 0 ? string.Join(" | ", snapshots) : "none"));
}

captured.WasFound.ShouldBeTrue();
captured.EnvelopeTenantId.ShouldBe("red");
captured.EnvelopeSagaId.ShouldBe(sagaId.ToString());
captured.ContextTenantId.ShouldBe("red");

// After Handle ran and called MarkCompleted, Marten removes the saga row.
var store2 = _host.Services.GetRequiredService<IDocumentStore>();
await using var session = store2.QuerySession();
var remaining = await session.Query<TenantedRabbitSaga>()
.Where(x => x.Id == sagaId)
.ToListAsync();

remaining.ShouldBeEmpty();
}
}

public record StartSaga(Guid SagaId);

public record SagaTimeoutMsg(Guid SagaId) : TimeoutMessage(2.Seconds());

public class TenantedRabbitSaga : Saga
{
public Guid Id { get; set; }
public string? StoredTenantId { get; set; }
public bool TimedOut { get; set; }

public static (TenantedRabbitSaga, SagaTimeoutMsg) Start(StartSaga cmd, Envelope envelope)
{
return (
new TenantedRabbitSaga { Id = cmd.SagaId, StoredTenantId = envelope.TenantId },
new SagaTimeoutMsg(cmd.SagaId)
);
}

public void Handle(SagaTimeoutMsg timeout, Envelope envelope, IMessageContext context)
{
TimedOut = true;
SagaTimeoutCapture.Capture(envelope, context, wasFound: true);
MarkCompleted();
}
}

public static class SagaTimeoutCapture
{
public record Snapshot(
string? EnvelopeTenantId,
string? EnvelopeSagaId,
string? ContextTenantId,
bool WasFound);

private static TaskCompletionSource<Snapshot> _tcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);

public static void Reset()
{
_tcs = new TaskCompletionSource<Snapshot>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public static void Capture(Envelope envelope, IMessageContext context, bool wasFound)
{
_tcs.TrySetResult(new Snapshot(envelope.TenantId, envelope.SagaId, context.TenantId, wasFound));
}

public static Task<Snapshot> WaitAsync(TimeSpan timeout) => _tcs.Task.WaitAsync(timeout);
}
31 changes: 21 additions & 10 deletions src/Wolverine/Runtime/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public ValueTask SendAsync<T>(T message, DeliveryOptions? options = null)
{
throw new ArgumentNullException(nameof(message));
}

// Check for both so you don't get an infinite loop
// from TimeoutMessage
if (options == null && message is ISendMyself m)
Expand Down Expand Up @@ -347,24 +347,35 @@ private void trackEnvelopeCorrelation(Activity? activity, Envelope[] outgoing)

internal virtual void TrackEnvelopeCorrelation(Envelope outbound, Activity? activity)
{
outbound.Source = Runtime.Options.ServiceName;
StampEnvelope(outbound);
outbound.ConversationId = outbound.Id; // the message chain originates here
outbound.ParentId = activity?.Id;
outbound.Store = Storage;

// For scheduled wraps the transport serializes the inner envelope, so the
// context fields stamped above also have to land on the inner.
if (outbound is { MessageType: TransportConstants.ScheduledEnvelope, Message: Envelope inner })
{
StampEnvelope(inner);
}
}

internal virtual void StampEnvelope(Envelope envelope)
{
envelope.Source = Runtime.Options.ServiceName;
// DeliveryOptions.Override may have already stamped a per-message
// CorrelationId (e.g. from a Marten projection's RaiseSideEffects
// call passing MessageMetadata) — don't clobber it. See GH-2545.
if (outbound.CorrelationId.IsEmpty())
if (envelope.CorrelationId.IsEmpty())
{
outbound.CorrelationId = CorrelationId;
envelope.CorrelationId = CorrelationId;
}
outbound.ConversationId = outbound.Id; // the message chain originates here
outbound.TenantId ??= TenantId; // don't override a tenant id that's specifically set on the envelope itself
envelope.TenantId ??= TenantId;

if (Runtime.Options.EnableRelayOfUserName)
{
outbound.UserName ??= UserName;
envelope.UserName ??= UserName;
}

outbound.ParentId = activity?.Id;
outbound.Store = Storage;
}

internal async ValueTask PersistOrSendAsync(params Envelope[] outgoing)
Expand Down
7 changes: 6 additions & 1 deletion src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,6 @@ private async Task flushScheduledMessagesAsync()
internal override void TrackEnvelopeCorrelation(Envelope outbound, Activity? activity)
{
base.TrackEnvelopeCorrelation(outbound, activity);
outbound.SagaId = _sagaId?.ToString() ?? Envelope?.SagaId ?? outbound.SagaId;

if (ConversationId != Guid.Empty)
{
Expand All @@ -807,6 +806,12 @@ internal override void TrackEnvelopeCorrelation(Envelope outbound, Activity? act
}
}

internal override void StampEnvelope(Envelope envelope)
{
base.StampEnvelope(envelope);
envelope.SagaId = _sagaId?.ToString() ?? Envelope?.SagaId ?? envelope.SagaId;
}

public void OverrideStorage(IMessageStore messageStore)
{
Storage = messageStore;
Expand Down
9 changes: 7 additions & 2 deletions src/Wolverine/Runtime/Scheduled/EnvelopeReaderWriter.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using JasperFx.Core.Reflection;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;

Expand Down Expand Up @@ -33,6 +32,12 @@ public byte[] WriteMessage(object message)

public byte[] Write(Envelope model)
{
return EnvelopeSerializer.Serialize(model.As<Envelope>());
if (model.Message is not Envelope inner)
{
throw new InvalidOperationException(
$"{nameof(EnvelopeReaderWriter)} can only serialize a scheduled-wrap envelope whose Message is the inner Envelope, but got {model.Message?.GetType().FullName ?? "null"}.");
}

return EnvelopeSerializer.Serialize(inner);
}
}
2 changes: 0 additions & 2 deletions src/Wolverine/Runtime/Scheduled/EnvelopeScheduleExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;
using Wolverine.Transports.Sending;

Expand All @@ -18,7 +17,6 @@ public static Envelope ForScheduledSend(this Envelope envelope, ISendingAgent? s
Status = EnvelopeStatus.Scheduled,
OwnerId = TransportConstants.AnyNode,
Sender = sender,
Data = EnvelopeSerializer.Serialize(envelope),
TopicName = envelope.TopicName
};
}
Expand Down
Loading