diff --git a/docs/guide/messaging/transports/nats.md b/docs/guide/messaging/transports/nats.md index 1e7e01dec..975d80152 100644 --- a/docs/guide/messaging/transports/nats.md +++ b/docs/guide/messaging/transports/nats.md @@ -348,10 +348,32 @@ opts.UseNats("nats://localhost:4222") When conditions are met, scheduled messages use NATS headers: - `Nats-Schedule: @at ` -- `Nats-Schedule-Target: ` +- `Nats-Schedule-Target: ` The transport automatically detects server version at startup. +NATS requires the scheduling (control) message to be published to a subject that is **different** from +`Nats-Schedule-Target` — publishing both to the same subject is rejected with +`message schedules target is invalid` (err 10190). Wolverine therefore publishes the control message to a +derived **schedule subject** — the destination subject plus a suffix (default `.scheduled`, e.g. +`orders.created.scheduled`) — while `Nats-Schedule-Target` stays the real destination +(`orders.created`). At the scheduled time the server materializes a new message onto the target subject, +where your listener's consumer receives it; the control message itself is never delivered to consumers. + +Both the target and the derived schedule subject must be covered by the **same stream**. The schedule +subject is the target plus an extra suffix token (`orders.created` → `orders.created.scheduled`), so it +always has one more token than the target. Any filter that only matches the target's token count — an +exact-subject filter, or a `*` pattern such as `orders.*` — therefore covers the target but **not** +`.scheduled`. Cover both with a `>`-style prefix wildcard such as `orders.>`, or list the target +and schedule subjects as explicit filters. Override the suffix per publishing endpoint when needed: + +```csharp +opts.PublishMessage() + .ToNatsSubject("orders.created") + .UseJetStream("ORDERS") + .UseScheduleSubjectSuffix(".deferred"); +``` + ### Fallback Behavior When native scheduled send is not available (server < 2.12 or stream not configured), Wolverine falls back to its database-backed scheduled message persistence. diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NatsScheduleSubjectSuffixTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NatsScheduleSubjectSuffixTests.cs new file mode 100644 index 000000000..bb8cfe619 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/NatsScheduleSubjectSuffixTests.cs @@ -0,0 +1,38 @@ +using Shouldly; +using Wolverine.Nats.Configuration; +using Wolverine.Nats.Internal; +using Xunit; + +namespace Wolverine.Nats.Tests; + +public class NatsScheduleSubjectSuffixTests +{ + private static NatsEndpoint EndpointFor(string subject = "orders.created") + { + var transport = new NatsTransport(); + return (NatsEndpoint)transport.GetOrCreateEndpoint(NatsEndpointUri.Subject(subject)); + } + + [Fact] + public void default_suffix_is_scheduled() + { + EndpointFor().ScheduleSubjectSuffix.ShouldBe(".scheduled"); + } + + [Theory] + [InlineData(".scheduled")] + [InlineData(".override-scheduled")] + [InlineData(".control")] + public void use_schedule_subject_suffix_round_trips_to_endpoint(string suffix) + { + var endpoint = EndpointFor(); + var configuration = new NatsSubscriberConfiguration(endpoint); + + configuration.UseScheduleSubjectSuffix(suffix); + // Callbacks are buffered in the IDelayedEndpointConfiguration base; Apply() mirrors what the + // runtime does at endpoint compile time. + ((Wolverine.Configuration.IDelayedEndpointConfiguration)configuration).Apply(); + + endpoint.ScheduleSubjectSuffix.ShouldBe(suffix); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/ScheduledMessageDeliveryTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/ScheduledMessageDeliveryTests.cs new file mode 100644 index 000000000..ca5e7a781 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/ScheduledMessageDeliveryTests.cs @@ -0,0 +1,138 @@ +using FluentAssertions; +using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Nats.Internal; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.Nats.Tests; + +/// +/// Reproduces the NATS JetStream native scheduled-delivery failure (err 10190). Unlike the compliance +/// schedule_send test, the publishing endpoint here uses .UseJetStream(stream) — that is what +/// engages the native scheduled-send path; without it Wolverine falls back to durable scheduling and the +/// bug is never exercised. +/// +[Collection("NATS Integration Tests")] +[Trait("Category", "Integration")] +public class ScheduledMessageDeliveryTests : IAsyncLifetime +{ + private IHost? _sender; + private IHost? _receiver; + private string _receiverSubject = ""; + private string _streamName = ""; + + public async Task InitializeAsync() + { + var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + + // Unique per run (GUID, not an in-process counter) so repeated runs against a persistent NATS + // instance never collide on stream name / subjects. The stream is torn down in DisposeAsync. + var id = Guid.NewGuid().ToString("N"); + _streamName = $"SCHEDULED_{id}"; + _receiverSubject = $"test.scheduled.{id}.receiver"; + var streamSubjects = $"test.scheduled.{id}.>"; + + _sender = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "ScheduleSender"; + opts.UseNats(natsUrl) + .AutoProvision() + .UseJetStream(js => js.MaxDeliver = 5) + .DefineWorkQueueStream(_streamName, s => s.EnableScheduledDelivery(), streamSubjects); + + // .UseJetStream on the PUBLISHING endpoint is what forces the native scheduled-send path. + opts.PublishMessage() + .ToNatsSubject(_receiverSubject) + .UseJetStream(_streamName); + }) + .StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.ServiceName = "ScheduleReceiver"; + opts.UseNats(natsUrl) + .AutoProvision() + .UseJetStream(js => js.MaxDeliver = 5) + .DefineWorkQueueStream(_streamName, s => s.EnableScheduledDelivery(), streamSubjects); + + opts.ListenToNatsSubject(_receiverSubject) + .Named("receiver") + .UseJetStream(_streamName, $"receiver-consumer-{id}"); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + // Delete the run's stream while a connection is still open so persistent NATS instances + // don't accumulate long-lived test artifacts. Best-effort: the stream may never have been + // provisioned (e.g. startup failed), so swallow any error. + if (_sender != null && _streamName.IsNotEmpty()) + { + try + { + var runtime = _sender.Services.GetRequiredService(); + var transport = runtime.Options.Transports.GetOrCreate(); + await transport.JetStreamContext.DeleteStreamAsync(_streamName); + } + catch + { + // ignore — best-effort cleanup + } + } + + if (_sender != null) + { + await _sender.StopAsync(); + } + + if (_receiver != null) + { + await _receiver.StopAsync(); + } + + _sender?.Dispose(); + _receiver?.Dispose(); + } + + [Fact] + public async Task schedule_send_over_native_jetstream_scheduling() + { + // Native scheduled delivery requires NATS Server 2.12+. Skip on older images rather than fail. + var runtime = _sender!.Services.GetRequiredService(); + var transport = runtime.Options.Transports.GetOrCreate(); + if (!transport.ServerSupportsScheduledSend) + { + return; + } + + // Guard: confirm the native path is engaged, not the durable-scheduling fallback that would mask the bug. + var agent = runtime.Endpoints.GetOrBuildSendingAgent(new Uri($"nats://subject/{_receiverSubject}")); + agent.SupportsNativeScheduledSend.Should().BeTrue(); + + var message = new ScheduledPing(Guid.NewGuid(), "scheduled"); + + var session = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .WaitForMessageToBeReceivedAt(_receiver!) + .ExecuteAndWaitAsync(c => c.ScheduleAsync(message, 5.Seconds()).AsTask()); + + session.Received.SingleMessage().Should().BeEquivalentTo(message); + } +} + +public record ScheduledPing(Guid Id, string Text); + +public class ScheduledPingHandler +{ + public void Handle(ScheduledPing message) + { + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Configuration/NatsSubscriberConfiguration.cs b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsSubscriberConfiguration.cs index c62fbb118..d27268a3e 100644 --- a/src/Transports/NATS/Wolverine.Nats/Configuration/NatsSubscriberConfiguration.cs +++ b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsSubscriberConfiguration.cs @@ -25,4 +25,16 @@ public NatsSubscriberConfiguration UseJetStream(string? streamName = null) return this; } + + /// + /// Override the suffix used to derive the NATS JetStream scheduling subject for native scheduled sends + /// (default .scheduled). The derived subject must stay covered by the destination's stream. + /// + public NatsSubscriberConfiguration UseScheduleSubjectSuffix(string suffix) + { + ArgumentException.ThrowIfNullOrWhiteSpace(suffix, nameof(suffix)); + + add(endpoint => endpoint.ScheduleSubjectSuffix = suffix); + return this; + } } diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs index 267dbbee6..45e1fc073 100644 --- a/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs +++ b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs @@ -13,11 +13,15 @@ internal class JetStreamPublisher : INatsPublisher private readonly NatsConnection _connection; private readonly INatsJSContext _jetStreamContext; private readonly ILogger _logger; + private readonly string _scheduleSubjectSuffix; - public JetStreamPublisher(NatsConnection connection, ILogger logger) + public JetStreamPublisher(NatsConnection connection, + ILogger logger, + string scheduleSubjectSuffix = ".scheduled") { _connection = connection; _logger = logger; + _scheduleSubjectSuffix = scheduleSubjectSuffix; _jetStreamContext = connection.CreateJetStreamContext(); } @@ -70,28 +74,33 @@ await _connection.PublishAsync( } else { - // Check if this is a scheduled message + var publishSubject = subject; + if (envelope.ScheduledTime.HasValue) { - // Add NATS scheduling headers - // Format: @at + // NATS rejects a scheduled publish whose subject equals Nats-Schedule-Target ("message + // schedules target is invalid", err 10190). So the target stays the real destination + // (where the consumer listens and the server materializes the message), and the control + // message goes to a derived subject that must still be covered by the same stream. var scheduledTime = envelope.ScheduledTime.Value.ToUniversalTime(); headers["Nats-Schedule"] = $"@at {scheduledTime:O}"; headers["Nats-Schedule-Target"] = subject; - + publishSubject = subject + _scheduleSubjectSuffix; + if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogDebug( - "Scheduling message {MessageId} for delivery at {ScheduledTime} to {Subject}", + "Scheduling message {MessageId} for delivery at {ScheduledTime} to {Target} via schedule subject {ScheduleSubject}", envelope.Id, scheduledTime, - subject + subject, + publishSubject ); } } - + var ack = await _jetStreamContext.PublishAsync( - subject, + publishSubject, data, headers: headers, cancellationToken: cancellation diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs index 4250b196e..319585045 100644 --- a/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs @@ -47,6 +47,13 @@ public NatsEndpoint(string subject, NatsTransport transport, EndpointRole role) public string? DeadLetterSubject { get; set; } public int MaxDeliveryAttempts { get; set; } = 5; + /// + /// Suffix appended to the destination subject to form the NATS JetStream scheduling subject for native + /// scheduled sends. Must keep the schedule subject covered by the stream (e.g. a prefix.> + /// filter covers {subject}.scheduled). Defaults to .scheduled. + /// + public string ScheduleSubjectSuffix { get; set; } = ".scheduled"; + /// /// Per-endpoint override for the JetStream consumer's DeliverPolicy. /// When non-null this wins over diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs index 4333abe1e..3d6207910 100644 --- a/src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs @@ -42,7 +42,7 @@ bool supportsNativeScheduledSend ) { INatsPublisher publisher = useJetStream - ? new JetStreamPublisher(connection, logger) + ? new JetStreamPublisher(connection, logger, endpoint.ScheduleSubjectSuffix) : new CoreNatsPublisher(connection, logger); return new NatsSender(endpoint, publisher, logger, mapper, cancellation, supportsNativeScheduledSend);