Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
using Wolverine.Postgresql;
using Xunit;
using Xunit.Abstractions;

namespace PostgresqlTests;

/// <summary>
/// Verifies <see cref="IPolicies.AlwaysMakeScheduledMessagesDurable"/> for the local-queue
/// path. <c>BufferedLocalQueue.SupportsNativeScheduledSend</c> reports <c>true</c>, but its
/// "native" scheduling is the in-process <c>InMemoryScheduledJobProcessor</c> — non-persistent,
/// lost on host restart. The policy redirects scheduled envelopes destined for non-durable
/// local queues to the message store inbox so they survive crashes and are recovered by the
/// scheduled-job poller.
/// </summary>
public class scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set : PostgresqlContext
{
private readonly ITestOutputHelper _output;

public scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set(ITestOutputHelper output)
{
_output = output;
}

[Fact]
public async Task local_queue_persists_scheduled_messages_to_message_store_when_policy_is_set()
{
const string schema = "always_durable_on";

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, schema);
opts.Durability.Mode = DurabilityMode.Solo;

// Explicitly non-durable. Without the policy, scheduled envelopes here would
// sit in the in-process InMemoryScheduledJobProcessor and be lost on restart.
opts.LocalQueueFor<DurableTimeoutTestMessage>().BufferedInMemory();
opts.LocalQueueFor<DurableTimeoutTestReminder>().BufferedInMemory();

opts.Policies.AlwaysMakeScheduledMessagesDurable();

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

var bus = host.MessageBus();
var store = host.Services.GetRequiredService<IMessageStore>();

// Schedule both a plain message and a TimeoutMessage subtype. The policy is meant to
// cover both — TimeoutMessage isn't a special case at the scheduling decision point;
// it was just the original motivating use case (saga timeouts).
await bus.ScheduleAsync(new DurableTimeoutTestMessage(Guid.NewGuid()), 5.Minutes());
await bus.ScheduleAsync(new DurableTimeoutTestReminder(Guid.NewGuid()), 5.Minutes());

var counts = await pollScheduledCountAsync(store, expected: 2);

counts.Scheduled.ShouldBe(2);
}

[Fact]
public async Task local_queue_uses_in_memory_scheduling_without_the_policy()
{
// Negative control: same wiring as the test above MINUS AlwaysMakeScheduledMessagesDurable.
// BufferedLocalQueue's existing path stays intact — scheduled envelopes go to the
// in-process scheduler and the store's Scheduled count stays at zero.
const string schema = "always_durable_off";

using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, schema);
opts.Durability.Mode = DurabilityMode.Solo;

opts.LocalQueueFor<DurableTimeoutTestMessage>().BufferedInMemory();
opts.LocalQueueFor<DurableTimeoutTestReminder>().BufferedInMemory();

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

var bus = host.MessageBus();
var store = host.Services.GetRequiredService<IMessageStore>();

await bus.ScheduleAsync(new DurableTimeoutTestMessage(Guid.NewGuid()), 5.Minutes());
await bus.ScheduleAsync(new DurableTimeoutTestReminder(Guid.NewGuid()), 5.Minutes());

// Give any async outgoing flush a moment to complete; no scheduled rows should appear
// because the in-memory scheduler is the destination, not the message store.
await Task.Delay(500);
var counts = await store.Admin.FetchCountsAsync();
counts.Scheduled.ShouldBe(0);
}

private async Task<PersistedCounts> pollScheduledCountAsync(IMessageStore store, int expected)
{
// The IMessageStore.Inbox.ScheduleExecutionAsync write is awaited inside the publish
// path, so by the time bus.ScheduleAsync returns the row should already be in the DB.
// A short poll guards against any background-task timing differences without making
// the test slow when things are working.
var sw = Stopwatch.StartNew();
PersistedCounts counts;
do
{
counts = await store.Admin.FetchCountsAsync();
_output.WriteLine($"[POLL] Scheduled={counts.Scheduled}, Incoming={counts.Incoming}");
if (counts.Scheduled >= expected) return counts;
await Task.Delay(100);
} while (sw.Elapsed < TimeSpan.FromSeconds(5));

return counts;
}
}

public record DurableTimeoutTestMessage(Guid Id);

// TimeoutMessage subtype to mirror the saga-timeout scenario that originally
// motivated the policy. The 5-minute delay is plenty long that neither test
// race-condition fires the handler during the assertion window.
public record DurableTimeoutTestReminder(Guid Id) : TimeoutMessage(5.Minutes());

public static class DurableTimeoutTestHandler
{
// Handlers exist purely so default local routing routes the messages to a local queue.
// Both queues are configured BufferedInMemory in the test setup.
public static void Handle(DurableTimeoutTestMessage _) { }
public static void Handle(DurableTimeoutTestReminder _) { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
using Wolverine.Postgresql;
using Xunit;
using Xunit.Abstractions;

namespace Wolverine.RabbitMQ.Tests;

/// <summary>
/// Companion to the local-queue case in PostgresqlTests for the
/// <see cref="IPolicies.AlwaysMakeScheduledMessagesDurable"/> policy. Verifies the
/// invariant that scheduled-for-later envelopes destined for a non-durable RabbitMQ
/// sender end up persisted in the message store inbox — both with and without the
/// policy.
///
/// RabbitMQ doesn't support native scheduled sends, so the routing layer
/// (<c>MessageRoute.WriteEnvelope</c>) automatically swaps such envelopes onto the
/// <c>local://durable</c> system queue, which writes to <c>IMessageStore.Inbox</c>.
/// That swap predates the policy, so the policy is effectively a no-op for RabbitMQ
/// — these tests exist to lock that down so a future change to the routing layer
/// can't silently regress durability for scheduled sends to non-native broker
/// transports.
/// </summary>
public class scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set : IAsyncLifetime
{
private readonly ITestOutputHelper _output;
private string _queueName = null!;
private IHost _hostWithPolicy = null!;
private IHost _hostWithoutPolicy = null!;

public scheduled_messages_use_message_store_when_AlwaysMakeScheduledMessagesDurable_is_set(ITestOutputHelper output)
{
_output = output;
}

public async Task InitializeAsync()
{
// Distinct queue name per test run so concurrent test runs don't collide.
_queueName = RabbitTesting.NextQueueName();

// Two hosts in one fixture so we exercise both code paths against the same queue
// shape. Each uses its own Postgres schema so scheduled-row counts don't bleed
// between them. Neither host enables UseDurableInbox/UseDurableOutbox — the rabbit
// endpoint stays BufferedInMemory, which is the case the policy targets.
_hostWithPolicy = await buildHostAsync(applyPolicy: true, schemaName: "always_durable_rabbit_on");
_hostWithoutPolicy = await buildHostAsync(applyPolicy: false, schemaName: "always_durable_rabbit_off");
}

private Task<IHost> buildHostAsync(bool applyPolicy, string schemaName)
{
return Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup();

opts.PublishMessage<DurableScheduledRabbitTestMessage>().ToRabbitQueue(_queueName);
opts.PublishMessage<DurableScheduledRabbitTestReminder>().ToRabbitQueue(_queueName);

opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, schemaName);
opts.Durability.Mode = DurabilityMode.Solo;

if (applyPolicy)
{
opts.Policies.AlwaysMakeScheduledMessagesDurable();
}

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();
}

public async Task DisposeAsync()
{
if (_hostWithPolicy is not null)
{
await _hostWithPolicy.StopAsync();
_hostWithPolicy.Dispose();
}

if (_hostWithoutPolicy is not null)
{
await _hostWithoutPolicy.StopAsync();
_hostWithoutPolicy.Dispose();
}
}

[Fact]
public async Task non_durable_rabbit_sender_persists_scheduled_messages_to_message_store_when_policy_is_set()
{
var bus = _hostWithPolicy.MessageBus();
var store = _hostWithPolicy.Services.GetRequiredService<IMessageStore>();

await bus.ScheduleAsync(new DurableScheduledRabbitTestMessage(Guid.NewGuid()), 5.Minutes());
await bus.ScheduleAsync(new DurableScheduledRabbitTestReminder(Guid.NewGuid()), 5.Minutes());

var counts = await pollScheduledCountAsync(store, expected: 2);

counts.Scheduled.ShouldBe(2);
}

[Fact]
public async Task non_durable_rabbit_sender_already_persists_scheduled_messages_via_routing_swap_without_policy()
{
// Lock down the EXISTING routing-layer behavior: even without the policy, scheduled
// messages destined for a non-native-scheduling broker (RabbitMQ here) end up in
// the message store via MessageRoute.WriteEnvelope's swap to the local://durable
// queue. If a future refactor breaks that swap, this test catches it before the
// policy gets misdiagnosed as the only thing keeping non-native broker scheduling
// durable.
var bus = _hostWithoutPolicy.MessageBus();
var store = _hostWithoutPolicy.Services.GetRequiredService<IMessageStore>();

await bus.ScheduleAsync(new DurableScheduledRabbitTestMessage(Guid.NewGuid()), 5.Minutes());
await bus.ScheduleAsync(new DurableScheduledRabbitTestReminder(Guid.NewGuid()), 5.Minutes());

var counts = await pollScheduledCountAsync(store, expected: 2);

counts.Scheduled.ShouldBe(2);
}

private async Task<PersistedCounts> pollScheduledCountAsync(IMessageStore store, int expected)
{
// Storage.Inbox writes are awaited inside the bus's publish path, so by the time
// bus.ScheduleAsync returns the row should already be in the DB. A short poll
// guards against background-task timing differences without making the test slow
// when things are working.
var sw = Stopwatch.StartNew();
PersistedCounts counts;
do
{
counts = await store.Admin.FetchCountsAsync();
_output.WriteLine($"[POLL] Scheduled={counts.Scheduled}, Incoming={counts.Incoming}");
if (counts.Scheduled >= expected) return counts;
await Task.Delay(100);
} while (sw.Elapsed < TimeSpan.FromSeconds(5));

return counts;
}
}

public record DurableScheduledRabbitTestMessage(Guid Id);

// TimeoutMessage subtype to mirror saga timeouts. Same path through the bus — confirms
// the policy / routing behavior is not gated on message base type.
public record DurableScheduledRabbitTestReminder(Guid Id) : TimeoutMessage(5.Minutes());
13 changes: 13 additions & 0 deletions src/Wolverine/DurabilitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ internal set
/// </summary>
public bool DurabilityAgentEnabled { get; set; } = true;

/// <summary>
/// When true, scheduled-for-later messages destined for non-durable
/// <see cref="Transports.Local.BufferedLocalQueue"/> instances route to
/// <c>IMessageStore.Inbox</c> instead of the in-process
/// <c>IScheduledJobProcessor</c>. Set via
/// <see cref="IPolicies.AlwaysMakeScheduledMessagesDurable"/>.
///
/// Other scheduling paths already provide durability without this flag — see the
/// XML doc on <see cref="IPolicies.AlwaysMakeScheduledMessagesDurable"/> for the
/// full matrix. No-ops when no message store is configured.
/// </summary>
public bool AlwaysMakeScheduledMessagesDurable { get; set; }

/// <summary>
/// How long should successfully handled messages be kept to use in idempotency checking
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions src/Wolverine/IPolicies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ public interface IPolicies : IEnumerable<IWolverinePolicy>, IWithFailurePolicies
/// </summary>
void UseDurableOutboxOnAllSendingEndpoints();

/// <summary>
/// Persist scheduled-for-later messages destined for non-durable local queues
/// through <c>IMessageStore</c> rather than the in-process
/// <c>IScheduledJobProcessor</c>, so they survive process restarts. The remaining
/// scheduling paths already provide durability without this policy:
/// <list type="bullet">
/// <item>Native broker scheduling (Azure Service Bus, Pulsar, Redis, Pub/Sub):
/// persisted server-side by the broker.</item>
/// <item>Non-native broker senders (RabbitMQ, SQS, Kafka): the routing layer
/// (<c>MessageRoute.WriteEnvelope</c>) automatically swaps scheduled envelopes
/// onto the <c>local://durable</c> system queue, which writes to the message
/// store inbox.</item>
/// <item>Local queues configured with <c>UseDurableInbox()</c>: already write
/// to the message store via <c>DurableLocalQueue</c>.</item>
/// </list>
/// The unique gap this policy plugs is the default <c>BufferedInMemory</c> local
/// queue case — without the policy those scheduled messages live only in memory.
/// No-ops when no message store is configured (a startup warning is emitted in
/// that case).
/// </summary>
void AlwaysMakeScheduledMessagesDurable();

/// <summary>
/// Create a policy for all listening *non local* endpoints
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ public async Task FlushOutgoingMessagesAsync()
}
else
{
// Non-durable sender, no native scheduling. In current Wolverine this
// branch is effectively unreachable for non-local transports — the
// routing layer (MessageRoute.WriteEnvelope) already swaps such
// envelopes to the local://durable queue, which means Sender.IsDurable
// would be true above. Kept for defense in depth.
Runtime.Logger.LogDebug("Scheduling envelope {EnvelopeId} ({MessageType}) for in-memory execution (non-durable, no native scheduling) to {Destination}", envelope.Id, envelope.MessageType, envelope.Destination);
Runtime.ScheduleLocalExecutionInMemory(envelope.ScheduledTime!.Value, envelope);
}
Expand Down
13 changes: 13 additions & 0 deletions src/Wolverine/Runtime/WolverineRuntime.HostService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ public async Task StartAsync(CancellationToken cancellationToken)

await _stores.Value.InitializeAsync();

// AlwaysMakeScheduledMessagesDurable opts every non-durable scheduled send onto
// the message store inbox; if no store is configured, we silently fall through
// to in-process scheduling (lost on restart) — defeating the policy. Surface
// this as a startup warning so a misconfiguration is observable rather than a
// silent durability gap.
if (Options.Durability.AlwaysMakeScheduledMessagesDurable && Storage is NullMessageStore)
{
Logger.LogWarning(
"Policies.AlwaysMakeScheduledMessagesDurable() is set but no message store is configured. " +
"Scheduled messages will continue to use in-process scheduling and will be lost on restart. " +
"Configure a message store (e.g. PersistMessagesWithPostgresql) to make the policy effective.");
}

if (!Options.ExternalTransportsAreStubbed)
{
foreach (var configuresRuntime in Options.Transports.OfType<ITransportConfiguresRuntime>().ToArray())
Expand Down
Loading
Loading