Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion docs/guide/messaging/transports/nats.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,29 @@ opts.UseNats("nats://localhost:4222")

When conditions are met, scheduled messages use NATS headers:
- `Nats-Schedule: @at <RFC3339 timestamp>`
- `Nats-Schedule-Target: <subject>`
- `Nats-Schedule-Target: <destination subject>`

The transport automatically detects server version at startup.

NATS requires the scheduling (control) message to be published to a subject that is **different** from
`Nats-Schedule-Target` — publishing both to the same subject is rejected with
`message schedules target is invalid` (err 10190). Wolverine therefore publishes the control message to a
derived **schedule subject** — the destination subject plus a suffix (default `.scheduled`, e.g.
`orders.created.scheduled`) — while `Nats-Schedule-Target` stays the real destination
(`orders.created`). At the scheduled time the server materializes a new message onto the target subject,
where your listener's consumer receives it; the control message itself is never delivered to consumers.

Both the schedule subject and the target must be covered by the **same stream**, so use a wildcard subject
filter such as `orders.>` (a single-token `*` filter or an exact-subject filter will **not** cover
`<subject>.scheduled`). Override the suffix per publishing endpoint when needed:
Comment thread
midub marked this conversation as resolved.
Outdated

```csharp
opts.PublishMessage<OrderCreated>()
.ToNatsSubject("orders.created")
.UseJetStream("ORDERS")
.UseScheduleSubjectSuffix(".deferred");
```

### Fallback Behavior

When native scheduled send is not available (server < 2.12 or stream not configured), Wolverine falls back to its database-backed scheduled message persistence.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Shouldly;
using Wolverine.Nats.Configuration;
using Wolverine.Nats.Internal;
using Xunit;

namespace Wolverine.Nats.Tests;

public class NatsScheduleSubjectSuffixTests
{
private static NatsEndpoint EndpointFor(string subject = "orders.created")
{
var transport = new NatsTransport();
return (NatsEndpoint)transport.GetOrCreateEndpoint(NatsEndpointUri.Subject(subject));
}

[Fact]
public void default_suffix_is_scheduled()
{
EndpointFor().ScheduleSubjectSuffix.ShouldBe(".scheduled");
}

[Theory]
[InlineData(".scheduled")]
[InlineData(".override-scheduled")]
[InlineData(".control")]
public void use_schedule_subject_suffix_round_trips_to_endpoint(string suffix)
{
var endpoint = EndpointFor();
var configuration = new NatsSubscriberConfiguration(endpoint);

configuration.UseScheduleSubjectSuffix(suffix);
// Callbacks are buffered in the IDelayedEndpointConfiguration base; Apply() mirrors what the
// runtime does at endpoint compile time.
((Wolverine.Configuration.IDelayedEndpointConfiguration)configuration).Apply();

endpoint.ScheduleSubjectSuffix.ShouldBe(suffix);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using FluentAssertions;
using JasperFx.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Wolverine.Nats.Internal;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Xunit;

namespace Wolverine.Nats.Tests;

/// <summary>
/// Reproduces the NATS JetStream native scheduled-delivery failure (err 10190). Unlike the compliance
/// <c>schedule_send</c> test, the publishing endpoint here uses <c>.UseJetStream(stream)</c> — that is what
/// engages the native scheduled-send path; without it Wolverine falls back to durable scheduling and the
/// bug is never exercised.
/// </summary>
[Collection("NATS Integration Tests")]
[Trait("Category", "Integration")]
public class ScheduledMessageDeliveryTests : IAsyncLifetime
{
private static int _counter;
private IHost? _sender;
private IHost? _receiver;
private string _receiverSubject = "";

public async Task InitializeAsync()
{
var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222";

var n = ++_counter;
var streamName = $"SCHEDULED_{n}";
_receiverSubject = $"test.scheduled.{n}.receiver";
var streamSubjects = $"test.scheduled.{n}.>";
Comment thread
midub marked this conversation as resolved.
Outdated

_sender = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ServiceName = "ScheduleSender";
opts.UseNats(natsUrl)
.AutoProvision()
.UseJetStream(js => js.MaxDeliver = 5)
.DefineWorkQueueStream(streamName, s => s.EnableScheduledDelivery(), streamSubjects);

// .UseJetStream on the PUBLISHING endpoint is what forces the native scheduled-send path.
opts.PublishMessage<ScheduledPing>()
.ToNatsSubject(_receiverSubject)
.UseJetStream(streamName);
})
.StartAsync();

_receiver = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.ServiceName = "ScheduleReceiver";
opts.UseNats(natsUrl)
.AutoProvision()
.UseJetStream(js => js.MaxDeliver = 5)
.DefineWorkQueueStream(streamName, s => s.EnableScheduledDelivery(), streamSubjects);

opts.ListenToNatsSubject(_receiverSubject)
.Named("receiver")
.UseJetStream(streamName, $"receiver-consumer-{n}");
})
.StartAsync();
}

public async Task DisposeAsync()
{
if (_sender != null)
{
await _sender.StopAsync();
}

if (_receiver != null)
{
await _receiver.StopAsync();
}

_sender?.Dispose();
_receiver?.Dispose();
}

[Fact]
public async Task schedule_send_over_native_jetstream_scheduling()
{
// Native scheduled delivery requires NATS Server 2.12+. Skip on older images rather than fail.
var runtime = _sender!.Services.GetRequiredService<IWolverineRuntime>();
var transport = runtime.Options.Transports.GetOrCreate<NatsTransport>();
if (!transport.ServerSupportsScheduledSend)
{
return;
}

// Guard: confirm the native path is engaged, not the durable-scheduling fallback that would mask the bug.
var agent = runtime.Endpoints.GetOrBuildSendingAgent(new Uri($"nats://subject/{_receiverSubject}"));
agent.SupportsNativeScheduledSend.Should().BeTrue();

var message = new ScheduledPing(Guid.NewGuid(), "scheduled");

var session = await _sender
.TrackActivity()
.AlsoTrack(_receiver)
.Timeout(30.Seconds())
.WaitForMessageToBeReceivedAt<ScheduledPing>(_receiver!)
.ExecuteAndWaitAsync(c => c.ScheduleAsync(message, 5.Seconds()).AsTask());

session.Received.SingleMessage<ScheduledPing>().Should().BeEquivalentTo(message);
}
}

public record ScheduledPing(Guid Id, string Text);

public class ScheduledPingHandler
{
public void Handle(ScheduledPing message)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,14 @@ public NatsSubscriberConfiguration UseJetStream(string? streamName = null)

return this;
}

/// <summary>
/// Override the suffix used to derive the NATS JetStream scheduling subject for native scheduled sends
/// (default <c>.scheduled</c>). The derived subject must stay covered by the destination's stream.
/// </summary>
public NatsSubscriberConfiguration UseScheduleSubjectSuffix(string suffix)
{
add(endpoint => endpoint.ScheduleSubjectSuffix = suffix);
return this;
}
Comment thread
midub marked this conversation as resolved.
}
28 changes: 19 additions & 9 deletions src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ internal class JetStreamPublisher : INatsPublisher
private readonly NatsConnection _connection;
private readonly INatsJSContext _jetStreamContext;
private readonly ILogger<NatsEndpoint> _logger;
private readonly string _scheduleSubjectSuffix;

public JetStreamPublisher(NatsConnection connection, ILogger<NatsEndpoint> logger)
public JetStreamPublisher(NatsConnection connection,
ILogger<NatsEndpoint> logger,
string scheduleSubjectSuffix = ".scheduled")
{
_connection = connection;
_logger = logger;
// An empty suffix would make the schedule subject equal the target and re-trigger err 10190.
_scheduleSubjectSuffix = string.IsNullOrWhiteSpace(scheduleSubjectSuffix) ? ".scheduled" : scheduleSubjectSuffix;
_jetStreamContext = connection.CreateJetStreamContext();
}

Expand Down Expand Up @@ -70,28 +75,33 @@ await _connection.PublishAsync(
}
else
{
// Check if this is a scheduled message
var publishSubject = subject;

if (envelope.ScheduledTime.HasValue)
{
// Add NATS scheduling headers
// Format: @at <RFC3339 timestamp>
// NATS rejects a scheduled publish whose subject equals Nats-Schedule-Target ("message
// schedules target is invalid", err 10190). So the target stays the real destination
// (where the consumer listens and the server materializes the message), and the control
// message goes to a derived subject that must still be covered by the same stream.
var scheduledTime = envelope.ScheduledTime.Value.ToUniversalTime();
headers["Nats-Schedule"] = $"@at {scheduledTime:O}";
headers["Nats-Schedule-Target"] = subject;

publishSubject = subject + _scheduleSubjectSuffix;

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(
"Scheduling message {MessageId} for delivery at {ScheduledTime} to {Subject}",
"Scheduling message {MessageId} for delivery at {ScheduledTime} to {Target} via schedule subject {ScheduleSubject}",
envelope.Id,
scheduledTime,
subject
subject,
publishSubject
);
}
}

var ack = await _jetStreamContext.PublishAsync(
subject,
publishSubject,
data,
headers: headers,
cancellationToken: cancellation
Expand Down
7 changes: 7 additions & 0 deletions src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public NatsEndpoint(string subject, NatsTransport transport, EndpointRole role)
public string? DeadLetterSubject { get; set; }
public int MaxDeliveryAttempts { get; set; } = 5;

/// <summary>
/// Suffix appended to the destination subject to form the NATS JetStream scheduling subject for native
/// scheduled sends. Must keep the schedule subject covered by the stream (e.g. a <c>prefix.&gt;</c>
/// filter covers <c>{subject}.scheduled</c>). Defaults to <c>.scheduled</c>.
/// </summary>
public string ScheduleSubjectSuffix { get; set; } = ".scheduled";

/// <summary>
/// Per-endpoint override for the JetStream consumer's <c>DeliverPolicy</c>.
/// When non-null this wins over
Expand Down
2 changes: 1 addition & 1 deletion src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bool supportsNativeScheduledSend
)
{
INatsPublisher publisher = useJetStream
? new JetStreamPublisher(connection, logger)
? new JetStreamPublisher(connection, logger, endpoint.ScheduleSubjectSuffix)
: new CoreNatsPublisher(connection, logger);

return new NatsSender(endpoint, publisher, logger, mapper, cancellation, supportsNativeScheduledSend);
Expand Down