diff --git a/src/Persistence/PostgresqlTests/scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set.cs b/src/Persistence/PostgresqlTests/scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set.cs new file mode 100644 index 000000000..ece0f9a5b --- /dev/null +++ b/src/Persistence/PostgresqlTests/scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set.cs @@ -0,0 +1,136 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Logging; +using Wolverine.Persistence.Durability; +using Wolverine.Postgresql; +using Xunit; +using Xunit.Abstractions; + +namespace PostgresqlTests; + +/// +/// Verifies for the local-queue +/// path. BufferedLocalQueue.SupportsNativeScheduledSend reports true, but its +/// "native" scheduling is the in-process InMemoryScheduledJobProcessor — non-persistent, +/// lost on host restart. The policy redirects scheduled envelopes destined for non-durable +/// local queues to the message store inbox so they survive crashes and are recovered by the +/// scheduled-job poller. +/// +public class scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set : PostgresqlContext +{ + private readonly ITestOutputHelper _output; + + public scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task local_queue_persists_scheduled_messages_to_message_store_when_policy_is_set() + { + const string schema = "always_durable_on"; + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, schema); + opts.Durability.Mode = DurabilityMode.Solo; + + // Explicitly non-durable. Without the policy, scheduled envelopes here would + // sit in the in-process InMemoryScheduledJobProcessor and be lost on restart. + opts.LocalQueueFor().BufferedInMemory(); + opts.LocalQueueFor().BufferedInMemory(); + + opts.Policies.AlwaysMakeScheduledMessagesDurable(); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + var bus = host.MessageBus(); + var store = host.Services.GetRequiredService(); + + // Schedule both a plain message and a TimeoutMessage subtype. The policy is meant to + // cover both — TimeoutMessage isn't a special case at the scheduling decision point; + // it was just the original motivating use case (saga timeouts). + await bus.ScheduleAsync(new DurableTimeoutTestMessage(Guid.NewGuid()), 5.Minutes()); + await bus.ScheduleAsync(new DurableTimeoutTestReminder(Guid.NewGuid()), 5.Minutes()); + + var counts = await pollScheduledCountAsync(store, expected: 2); + + counts.Scheduled.ShouldBe(2); + } + + [Fact] + public async Task local_queue_uses_in_memory_scheduling_without_the_policy() + { + // Negative control: same wiring as the test above MINUS AlwaysMakeScheduledMessagesDurable. + // BufferedLocalQueue's existing path stays intact — scheduled envelopes go to the + // in-process scheduler and the store's Scheduled count stays at zero. + const string schema = "always_durable_off"; + + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, schema); + opts.Durability.Mode = DurabilityMode.Solo; + + opts.LocalQueueFor().BufferedInMemory(); + opts.LocalQueueFor().BufferedInMemory(); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + + var bus = host.MessageBus(); + var store = host.Services.GetRequiredService(); + + await bus.ScheduleAsync(new DurableTimeoutTestMessage(Guid.NewGuid()), 5.Minutes()); + await bus.ScheduleAsync(new DurableTimeoutTestReminder(Guid.NewGuid()), 5.Minutes()); + + // Give any async outgoing flush a moment to complete; no scheduled rows should appear + // because the in-memory scheduler is the destination, not the message store. + await Task.Delay(500); + var counts = await store.Admin.FetchCountsAsync(); + counts.Scheduled.ShouldBe(0); + } + + private async Task pollScheduledCountAsync(IMessageStore store, int expected) + { + // The IMessageStore.Inbox.ScheduleExecutionAsync write is awaited inside the publish + // path, so by the time bus.ScheduleAsync returns the row should already be in the DB. + // A short poll guards against any background-task timing differences without making + // the test slow when things are working. + var sw = Stopwatch.StartNew(); + PersistedCounts counts; + do + { + counts = await store.Admin.FetchCountsAsync(); + _output.WriteLine($"[POLL] Scheduled={counts.Scheduled}, Incoming={counts.Incoming}"); + if (counts.Scheduled >= expected) return counts; + await Task.Delay(100); + } while (sw.Elapsed < TimeSpan.FromSeconds(5)); + + return counts; + } +} + +public record DurableTimeoutTestMessage(Guid Id); + +// TimeoutMessage subtype to mirror the saga-timeout scenario that originally +// motivated the policy. The 5-minute delay is plenty long that neither test +// race-condition fires the handler during the assertion window. +public record DurableTimeoutTestReminder(Guid Id) : TimeoutMessage(5.Minutes()); + +public static class DurableTimeoutTestHandler +{ + // Handlers exist purely so default local routing routes the messages to a local queue. + // Both queues are configured BufferedInMemory in the test setup. + public static void Handle(DurableTimeoutTestMessage _) { } + public static void Handle(DurableTimeoutTestReminder _) { } +} diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set.cs new file mode 100644 index 000000000..74af598fe --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set.cs @@ -0,0 +1,152 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Logging; +using Wolverine.Persistence.Durability; +using Wolverine.Postgresql; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.RabbitMQ.Tests; + +/// +/// Companion to the local-queue case in PostgresqlTests for the +/// policy. Verifies the +/// invariant that scheduled-for-later envelopes destined for a non-durable RabbitMQ +/// sender end up persisted in the message store inbox — both with and without the +/// policy. +/// +/// RabbitMQ doesn't support native scheduled sends, so the routing layer +/// (MessageRoute.WriteEnvelope) automatically swaps such envelopes onto the +/// local://durable system queue, which writes to IMessageStore.Inbox. +/// That swap predates the policy, so the policy is effectively a no-op for RabbitMQ +/// — these tests exist to lock that down so a future change to the routing layer +/// can't silently regress durability for scheduled sends to non-native broker +/// transports. +/// +public class scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private string _queueName = null!; + private IHost _hostWithPolicy = null!; + private IHost _hostWithoutPolicy = null!; + + public scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + // Distinct queue name per test run so concurrent test runs don't collide. + _queueName = RabbitTesting.NextQueueName(); + + // Two hosts in one fixture so we exercise both code paths against the same queue + // shape. Each uses its own Postgres schema so scheduled-row counts don't bleed + // between them. Neither host enables UseDurableInbox/UseDurableOutbox — the rabbit + // endpoint stays BufferedInMemory, which is the case the policy targets. + _hostWithPolicy = await buildHostAsync(applyPolicy: true, schemaName: "always_durable_rabbit_on"); + _hostWithoutPolicy = await buildHostAsync(applyPolicy: false, schemaName: "always_durable_rabbit_off"); + } + + private Task buildHostAsync(bool applyPolicy, string schemaName) + { + return Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup(); + + opts.PublishMessage().ToRabbitQueue(_queueName); + opts.PublishMessage().ToRabbitQueue(_queueName); + + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, schemaName); + opts.Durability.Mode = DurabilityMode.Solo; + + if (applyPolicy) + { + opts.Policies.AlwaysMakeScheduledMessagesDurable(); + } + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + if (_hostWithPolicy is not null) + { + await _hostWithPolicy.StopAsync(); + _hostWithPolicy.Dispose(); + } + + if (_hostWithoutPolicy is not null) + { + await _hostWithoutPolicy.StopAsync(); + _hostWithoutPolicy.Dispose(); + } + } + + [Fact] + public async Task non_durable_rabbit_sender_persists_scheduled_messages_to_message_store_when_policy_is_set() + { + var bus = _hostWithPolicy.MessageBus(); + var store = _hostWithPolicy.Services.GetRequiredService(); + + await bus.ScheduleAsync(new DurableScheduledRabbitTestMessage(Guid.NewGuid()), 5.Minutes()); + await bus.ScheduleAsync(new DurableScheduledRabbitTestReminder(Guid.NewGuid()), 5.Minutes()); + + var counts = await pollScheduledCountAsync(store, expected: 2); + + counts.Scheduled.ShouldBe(2); + } + + [Fact] + public async Task non_durable_rabbit_sender_already_persists_scheduled_messages_via_routing_swap_without_policy() + { + // Lock down the EXISTING routing-layer behavior: even without the policy, scheduled + // messages destined for a non-native-scheduling broker (RabbitMQ here) end up in + // the message store via MessageRoute.WriteEnvelope's swap to the local://durable + // queue. If a future refactor breaks that swap, this test catches it before the + // policy gets misdiagnosed as the only thing keeping non-native broker scheduling + // durable. + var bus = _hostWithoutPolicy.MessageBus(); + var store = _hostWithoutPolicy.Services.GetRequiredService(); + + await bus.ScheduleAsync(new DurableScheduledRabbitTestMessage(Guid.NewGuid()), 5.Minutes()); + await bus.ScheduleAsync(new DurableScheduledRabbitTestReminder(Guid.NewGuid()), 5.Minutes()); + + var counts = await pollScheduledCountAsync(store, expected: 2); + + counts.Scheduled.ShouldBe(2); + } + + private async Task pollScheduledCountAsync(IMessageStore store, int expected) + { + // Storage.Inbox writes are awaited inside the bus's publish path, so by the time + // bus.ScheduleAsync returns the row should already be in the DB. A short poll + // guards against background-task timing differences without making the test slow + // when things are working. + var sw = Stopwatch.StartNew(); + PersistedCounts counts; + do + { + counts = await store.Admin.FetchCountsAsync(); + _output.WriteLine($"[POLL] Scheduled={counts.Scheduled}, Incoming={counts.Incoming}"); + if (counts.Scheduled >= expected) return counts; + await Task.Delay(100); + } while (sw.Elapsed < TimeSpan.FromSeconds(5)); + + return counts; + } +} + +public record DurableScheduledRabbitTestMessage(Guid Id); + +// TimeoutMessage subtype to mirror saga timeouts. Same path through the bus — confirms +// the policy / routing behavior is not gated on message base type. +public record DurableScheduledRabbitTestReminder(Guid Id) : TimeoutMessage(5.Minutes()); diff --git a/src/Wolverine/DurabilitySettings.cs b/src/Wolverine/DurabilitySettings.cs index 621a48d10..ede09b5ae 100644 --- a/src/Wolverine/DurabilitySettings.cs +++ b/src/Wolverine/DurabilitySettings.cs @@ -119,6 +119,19 @@ internal set /// public bool DurabilityAgentEnabled { get; set; } = true; + /// + /// When true, scheduled-for-later messages destined for non-durable + /// instances route to + /// IMessageStore.Inbox instead of the in-process + /// IScheduledJobProcessor. Set via + /// . + /// + /// Other scheduling paths already provide durability without this flag — see the + /// XML doc on for the + /// full matrix. No-ops when no message store is configured. + /// + public bool AlwaysMakeScheduledMessagesDurable { get; set; } + /// /// How long should successfully handled messages be kept to use in idempotency checking /// diff --git a/src/Wolverine/IPolicies.cs b/src/Wolverine/IPolicies.cs index 64f21db1c..2d6a6fab6 100644 --- a/src/Wolverine/IPolicies.cs +++ b/src/Wolverine/IPolicies.cs @@ -37,6 +37,28 @@ public interface IPolicies : IEnumerable, IWithFailurePolicies /// void UseDurableOutboxOnAllSendingEndpoints(); + /// + /// Persist scheduled-for-later messages destined for non-durable local queues + /// through IMessageStore rather than the in-process + /// IScheduledJobProcessor, so they survive process restarts. The remaining + /// scheduling paths already provide durability without this policy: + /// + /// Native broker scheduling (Azure Service Bus, Pulsar, Redis, Pub/Sub): + /// persisted server-side by the broker. + /// Non-native broker senders (RabbitMQ, SQS, Kafka): the routing layer + /// (MessageRoute.WriteEnvelope) automatically swaps scheduled envelopes + /// onto the local://durable system queue, which writes to the message + /// store inbox. + /// Local queues configured with UseDurableInbox(): already write + /// to the message store via DurableLocalQueue. + /// + /// The unique gap this policy plugs is the default BufferedInMemory local + /// queue case — without the policy those scheduled messages live only in memory. + /// No-ops when no message store is configured (a startup warning is emitted in + /// that case). + /// + void AlwaysMakeScheduledMessagesDurable(); + /// /// Create a policy for all listening *non local* endpoints /// diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 3c40810c6..734719d4e 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -207,6 +207,11 @@ public async Task FlushOutgoingMessagesAsync() } else { + // Non-durable sender, no native scheduling. In current Wolverine this + // branch is effectively unreachable for non-local transports — the + // routing layer (MessageRoute.WriteEnvelope) already swaps such + // envelopes to the local://durable queue, which means Sender.IsDurable + // would be true above. Kept for defense in depth. Runtime.Logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) for in-memory execution (non-durable, no native scheduling) to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination); Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope); } diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index e2c9ac659..eb351296c 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -63,6 +63,19 @@ public async Task StartAsync(CancellationToken cancellationToken) await _stores.Value.InitializeAsync(); + // AlwaysMakeScheduledMessagesDurable opts every non-durable scheduled send onto + // the message store inbox; if no store is configured, we silently fall through + // to in-process scheduling (lost on restart) — defeating the policy. Surface + // this as a startup warning so a misconfiguration is observable rather than a + // silent durability gap. + if (Options.Durability.AlwaysMakeScheduledMessagesDurable && Storage is NullMessageStore) + { + Logger.LogWarning( + "Policies.AlwaysMakeScheduledMessagesDurable() is set but no message store is configured. " + + "Scheduled messages will continue to use in-process scheduling and will be lost on restart. " + + "Configure a message store (e.g. PersistMessagesWithPostgresql) to make the policy effective."); + } + if (!Options.ExternalTransportsAreStubbed) { foreach (var configuresRuntime in Options.Transports.OfType().ToArray()) diff --git a/src/Wolverine/Transports/Local/BufferedLocalQueue.cs b/src/Wolverine/Transports/Local/BufferedLocalQueue.cs index 0e3062c58..21c9be347 100644 --- a/src/Wolverine/Transports/Local/BufferedLocalQueue.cs +++ b/src/Wolverine/Transports/Local/BufferedLocalQueue.cs @@ -1,5 +1,6 @@ using Wolverine.Configuration; using Wolverine.Logging; +using Wolverine.Persistence.Durability; using Wolverine.Runtime; using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports.Sending; @@ -64,6 +65,26 @@ Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable envelopes) public ValueTask EnqueueOutgoingAsync(Envelope envelope) { + // AlwaysMakeScheduledMessagesDurable: BufferedLocalQueue's "native" scheduling + // is the in-process IScheduledJobProcessor — non-persistent, lost on restart. + // The policy redirects scheduled envelopes to the durable inbox so they're + // recovered by the scheduled-job poller after a crash. Recovery-path enqueues + // (IListenerCircuit.EnqueueDirectlyAsync, line ~48) bypass this — those envelopes + // came FROM the message store and must not be re-stored. + if (envelope.IsScheduledForLater(DateTimeOffset.UtcNow) + && _runtime.Options.Durability.AlwaysMakeScheduledMessagesDurable + && _runtime.Storage is not NullMessageStore) + { + _messageTracker.Sent(envelope); + envelope.ReplyUri ??= ReplyUri; + // RescheduleExistingEnvelopeForRetryAsync is the misnamed-but-correct API for + // both new scheduled envelopes and retry rescheduling — it sets Status=Scheduled + // + OwnerId=AnyNode, then upserts via StoreIncomingAsync. Mirrors the call in + // MessageContext.flushScheduledMessagesAsync. ScheduleExecutionAsync (UPDATE-only) + // would silently no-op for envelopes that aren't already in the database. + return new ValueTask(_runtime.Storage.Inbox.RescheduleExistingEnvelopeForRetryAsync(envelope)); + } + EnqueueDirectly(envelope); return ValueTask.CompletedTask; diff --git a/src/Wolverine/WolverineOptions.Policies.cs b/src/Wolverine/WolverineOptions.Policies.cs index 193454258..238f0f82c 100644 --- a/src/Wolverine/WolverineOptions.Policies.cs +++ b/src/Wolverine/WolverineOptions.Policies.cs @@ -109,6 +109,11 @@ void IPolicies.UseDurableOutboxOnAllSendingEndpoints() }); } + void IPolicies.AlwaysMakeScheduledMessagesDurable() + { + Durability.AlwaysMakeScheduledMessagesDurable = true; + } + void IPolicies.AllListeners(Action configure) { var policy = new LambdaEndpointPolicy((e, _) =>