Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion docs/guide/messaging/transports/nats.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,32 @@ opts.UseNats("nats://localhost:4222")

When conditions are met, scheduled messages use NATS headers:
- `Nats-Schedule: @at <RFC3339 timestamp>`
- `Nats-Schedule-Target: <subject>`
- `Nats-Schedule-Target: <destination subject>`

The transport automatically detects server version at startup.

NATS requires the scheduling (control) message to be published to a subject that is **different** from
`Nats-Schedule-Target` — publishing both to the same subject is rejected with
`message schedules target is invalid` (err 10190). Wolverine therefore publishes the control message to a
derived **schedule subject** — the destination subject plus a suffix (default `.scheduled`, e.g.
`orders.created.scheduled`) — while `Nats-Schedule-Target` stays the real destination
(`orders.created`). At the scheduled time the server materializes a new message onto the target subject,
where your listener's consumer receives it; the control message itself is never delivered to consumers.

Both the target and the derived schedule subject must be covered by the **same stream**. The schedule
subject is the target plus an extra suffix token (`orders.created` → `orders.created.scheduled`), so it
always has one more token than the target. Any filter that only matches the target's token count — an
exact-subject filter, or a `*` pattern such as `orders.*` — therefore covers the target but **not**
`<subject>.scheduled`. Cover both with a `>`-style prefix wildcard such as `orders.>`, or list the target
and schedule subjects as explicit filters. Override the suffix per publishing endpoint when needed:

```csharp
opts.PublishMessage<OrderCreated>()
.ToNatsSubject("orders.created")
.UseJetStream("ORDERS")
.UseScheduleSubjectSuffix(".deferred");
```

### Fallback Behavior

When native scheduled send is not available (server < 2.12 or stream not configured), Wolverine falls back to its database-backed scheduled message persistence.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Shouldly;
using Wolverine.Nats.Configuration;
using Wolverine.Nats.Internal;
using Xunit;

namespace Wolverine.Nats.Tests;

public class NatsScheduleSubjectSuffixTests
{
private static NatsEndpoint EndpointFor(string subject = "orders.created")
{
var transport = new NatsTransport();
return (NatsEndpoint)transport.GetOrCreateEndpoint(NatsEndpointUri.Subject(subject));
}

[Fact]
public void default_suffix_is_scheduled()
{
EndpointFor().ScheduleSubjectSuffix.ShouldBe(".scheduled");
}

[Theory]
[InlineData(".scheduled")]
[InlineData(".override-scheduled")]
[InlineData(".control")]
public void use_schedule_subject_suffix_round_trips_to_endpoint(string suffix)
{
var endpoint = EndpointFor();
var configuration = new NatsSubscriberConfiguration(endpoint);

configuration.UseScheduleSubjectSuffix(suffix);
// Callbacks are buffered in the IDelayedEndpointConfiguration base; Apply() mirrors what the
// runtime does at endpoint compile time.
((Wolverine.Configuration.IDelayedEndpointConfiguration)configuration).Apply();

endpoint.ScheduleSubjectSuffix.ShouldBe(suffix);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using FluentAssertions;
using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine.Nats.Internal;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.Nats.Tests;

/// <summary>
/// Reproduces the NATS JetStream native scheduled-delivery failure (err 10190). Unlike the compliance
/// <c>schedule_send</c> test, the publishing endpoint here uses <c>.UseJetStream(stream)</c> — that is what
/// engages the native scheduled-send path; without it Wolverine falls back to durable scheduling and the
/// bug is never exercised.
/// </summary>
[Collection("NATS Integration Tests")]
[Trait("Category", "Integration")]
public class ScheduledMessageDeliveryTests : IAsyncLifetime
{
private IHost? _sender;
private IHost? _receiver;
private string _receiverSubject = "";
private string _streamName = "";

public async Task InitializeAsync()
{
var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222";

// Unique per run (GUID, not an in-process counter) so repeated runs against a persistent NATS
// instance never collide on stream name / subjects. The stream is torn down in DisposeAsync.
var id = Guid.NewGuid().ToString("N");
_streamName = $"SCHEDULED_{id}";
_receiverSubject = $"test.scheduled.{id}.receiver";
var streamSubjects = $"test.scheduled.{id}.>";

_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ServiceName = "ScheduleSender";
opts.UseNats(natsUrl)
.AutoProvision()
.UseJetStream(js => js.MaxDeliver = 5)
.DefineWorkQueueStream(_streamName, s => s.EnableScheduledDelivery(), streamSubjects);

// .UseJetStream on the PUBLISHING endpoint is what forces the native scheduled-send path.
opts.PublishMessage<ScheduledPing>()
.ToNatsSubject(_receiverSubject)
.UseJetStream(_streamName);
})
.StartAsync();

_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ServiceName = "ScheduleReceiver";
opts.UseNats(natsUrl)
.AutoProvision()
.UseJetStream(js => js.MaxDeliver = 5)
.DefineWorkQueueStream(_streamName, s => s.EnableScheduledDelivery(), streamSubjects);

opts.ListenToNatsSubject(_receiverSubject)
.Named("receiver")
.UseJetStream(_streamName, $"receiver-consumer-{id}");
})
.StartAsync();
}

public async Task DisposeAsync()
{
// Delete the run's stream while a connection is still open so persistent NATS instances
// don't accumulate long-lived test artifacts. Best-effort: the stream may never have been
// provisioned (e.g. startup failed), so swallow any error.
if (_sender != null && _streamName.IsNotEmpty())
{
try
{
var runtime = _sender.Services.GetRequiredService<IWolverineRuntime>();
var transport = runtime.Options.Transports.GetOrCreate<NatsTransport>();
await transport.JetStreamContext.DeleteStreamAsync(_streamName);
}
catch
{
// ignore — best-effort cleanup
}
}

if (_sender != null)
{
await _sender.StopAsync();
}

if (_receiver != null)
{
await _receiver.StopAsync();
}

_sender?.Dispose();
_receiver?.Dispose();
}

[Fact]
public async Task schedule_send_over_native_jetstream_scheduling()
{
// Native scheduled delivery requires NATS Server 2.12+. Skip on older images rather than fail.
var runtime = _sender!.Services.GetRequiredService<IWolverineRuntime>();
var transport = runtime.Options.Transports.GetOrCreate<NatsTransport>();
if (!transport.ServerSupportsScheduledSend)
{
return;
}

// Guard: confirm the native path is engaged, not the durable-scheduling fallback that would mask the bug.
var agent = runtime.Endpoints.GetOrBuildSendingAgent(new Uri($"nats://subject/{_receiverSubject}"));
agent.SupportsNativeScheduledSend.Should().BeTrue();

var message = new ScheduledPing(Guid.NewGuid(), "scheduled");

var session = await _sender
.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(30.Seconds())
.WaitForMessageToBeReceivedAt<ScheduledPing>(_receiver!)
.ExecuteAndWaitAsync(c => c.ScheduleAsync(message, 5.Seconds()).AsTask());

session.Received.SingleMessage<ScheduledPing>().Should().BeEquivalentTo(message);
}
}

public record ScheduledPing(Guid Id, string Text);

public class ScheduledPingHandler
{
public void Handle(ScheduledPing message)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,16 @@ public NatsSubscriberConfiguration UseJetStream(string? streamName = null)

return this;
}

/// <summary>
/// Override the suffix used to derive the NATS JetStream scheduling subject for native scheduled sends
/// (default <c>.scheduled</c>). The derived subject must stay covered by the destination's stream.
/// </summary>
public NatsSubscriberConfiguration UseScheduleSubjectSuffix(string suffix)
{
ArgumentException.ThrowIfNullOrWhiteSpace(suffix, nameof(suffix));

add(endpoint => endpoint.ScheduleSubjectSuffix = suffix);
return this;
}
Comment thread
midub marked this conversation as resolved.
}
27 changes: 18 additions & 9 deletions src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ internal class JetStreamPublisher : INatsPublisher
private readonly NatsConnection _connection;
private readonly INatsJSContext _jetStreamContext;
private readonly ILogger<NatsEndpoint> _logger;
private readonly string _scheduleSubjectSuffix;

public JetStreamPublisher(NatsConnection connection, ILogger<NatsEndpoint> logger)
public JetStreamPublisher(NatsConnection connection,
ILogger<NatsEndpoint> logger,
string scheduleSubjectSuffix = ".scheduled")
{
_connection = connection;
_logger = logger;
_scheduleSubjectSuffix = scheduleSubjectSuffix;
_jetStreamContext = connection.CreateJetStreamContext();
}

Expand Down Expand Up @@ -70,28 +74,33 @@ await _connection.PublishAsync(
}
else
{
// Check if this is a scheduled message
var publishSubject = subject;

if (envelope.ScheduledTime.HasValue)
{
// Add NATS scheduling headers
// Format: @at <RFC3339 timestamp>
// NATS rejects a scheduled publish whose subject equals Nats-Schedule-Target ("message
// schedules target is invalid", err 10190). So the target stays the real destination
// (where the consumer listens and the server materializes the message), and the control
// message goes to a derived subject that must still be covered by the same stream.
var scheduledTime = envelope.ScheduledTime.Value.ToUniversalTime();
headers["Nats-Schedule"] = $"@at {scheduledTime:O}";
headers["Nats-Schedule-Target"] = subject;

publishSubject = subject + _scheduleSubjectSuffix;

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(
"Scheduling message {MessageId} for delivery at {ScheduledTime} to {Subject}",
"Scheduling message {MessageId} for delivery at {ScheduledTime} to {Target} via schedule subject {ScheduleSubject}",
envelope.Id,
scheduledTime,
subject
subject,
publishSubject
);
}
}

var ack = await _jetStreamContext.PublishAsync(
subject,
publishSubject,
data,
headers: headers,
cancellationToken: cancellation
Expand Down
7 changes: 7 additions & 0 deletions src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public NatsEndpoint(string subject, NatsTransport transport, EndpointRole role)
public string? DeadLetterSubject { get; set; }
public int MaxDeliveryAttempts { get; set; } = 5;

/// <summary>
/// Suffix appended to the destination subject to form the NATS JetStream scheduling subject for native
/// scheduled sends. Must keep the schedule subject covered by the stream (e.g. a <c>prefix.&gt;</c>
/// filter covers <c>{subject}.scheduled</c>). Defaults to <c>.scheduled</c>.
/// </summary>
public string ScheduleSubjectSuffix { get; set; } = ".scheduled";

/// <summary>
/// Per-endpoint override for the JetStream consumer's <c>DeliverPolicy</c>.
/// When non-null this wins over
Expand Down
2 changes: 1 addition & 1 deletion src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bool supportsNativeScheduledSend
)
{
INatsPublisher publisher = useJetStream
? new JetStreamPublisher(connection, logger)
? new JetStreamPublisher(connection, logger, endpoint.ScheduleSubjectSuffix)
: new CoreNatsPublisher(connection, logger);

return new NatsSender(endpoint, publisher, logger, mapper, cancellation, supportsNativeScheduledSend);
Expand Down