diff --git a/docker-compose.yml b/docker-compose.yml index ee60faec3..e43456373 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -92,4 +92,11 @@ services: image: "redis:alpine" command: redis-server ports: - - "6379:6379" \ No newline at end of file + - "6379:6379" + + nats: + image: "nats:latest" + ports: + - "4222:4222" + - "8222:8222" + command: ["--jetstream", "-m", "8222"] \ No newline at end of file diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index d2784c22e..61d773537 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -186,6 +186,7 @@ const config: UserConfig = { {text: 'Sql Server', link: '/guide/messaging/transports/sqlserver'}, {text: 'PostgreSQL', link: '/guide/messaging/transports/postgresql'}, {text: 'MQTT', link: '/guide/messaging/transports/mqtt'}, + {text: 'NATS', link: '/guide/messaging/transports/nats'}, {text: 'Kafka', link: '/guide/messaging/transports/kafka'}, {text: 'SignalR', link: '/guide/messaging/transports/signalr'}, {text: 'Redis', link: '/guide/messaging/transports/redis'}, diff --git a/docs/guide/messaging/transports/nats.md b/docs/guide/messaging/transports/nats.md new file mode 100644 index 000000000..c6a3dd0f5 --- /dev/null +++ b/docs/guide/messaging/transports/nats.md @@ -0,0 +1,418 @@ +# Using NATS + +::: tip +Wolverine uses the official [NATS.Net client](https://github.com/nats-io/nats.net) to connect to NATS. +::: + +## Installing + +To use [NATS](https://nats.io/) as a messaging transport with Wolverine, first install the `WolverineFx.Nats` library via NuGet: + +```bash +dotnet add package WolverineFx.Nats +``` + +## Core NATS vs JetStream + +NATS provides two distinct messaging models: + +| Feature | Core NATS | JetStream | +|---------|-----------|-----------| +| **Persistence** | None (memory only) | Configurable (memory/file) | +| **Delivery Guarantee** | At-most-once | At-least-once | +| **Acknowledgments** | None | Full support (ack/nak/term) | +| **Requeue** | Via republish | Native via `NakAsync()` | +| **Dead Letter** | Not available | Via `AckTerminateAsync()` | +| **Scheduled Delivery** | Not available | Native (Server 2.12+) | + +Choose **Core NATS** for: +- Real-time notifications where message loss is acceptable +- Low-latency fire-and-forget messaging +- Heartbeats and ephemeral events + +Choose **JetStream** for: +- Commands and events requiring durability +- Workflows where message delivery must be guaranteed +- Scenarios requiring replay or scheduled delivery + +## Basic Configuration + +### Core NATS (Simple Pub/Sub) + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Connect to NATS + opts.UseNats("nats://localhost:4222") + .AutoProvision(); + + // Listen to a subject + opts.ListenToNatsSubject("orders.received") + .ProcessInline(); + + // Publish to a subject + opts.PublishAllMessages() + .ToNatsSubject("orders.received"); + }).StartAsync(); +``` + +### JetStream (Durable Messaging) + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseNats("nats://localhost:4222") + .AutoProvision() + .UseJetStream() + .DefineWorkQueueStream("ORDERS", "orders.>"); + + // Listen with JetStream consumer + opts.ListenToNatsSubject("orders.received") + .UseJetStream("ORDERS", "orders-consumer"); + + // Publishing automatically uses JetStream when stream is defined + opts.PublishAllMessages() + .ToNatsSubject("orders.received"); + }).StartAsync(); +``` + +## Connection Configuration + +### Basic Connection + +```csharp +opts.UseNats("nats://localhost:4222"); +``` + +### Connection with Timeouts + +```csharp +opts.UseNats("nats://localhost:4222") + .ConfigureTimeouts( + connectTimeout: TimeSpan.FromSeconds(10), + requestTimeout: TimeSpan.FromSeconds(30) + ); +``` + +## Authentication + +### Username and Password + +```csharp +opts.UseNats("nats://localhost:4222") + .WithCredentials("username", "password"); +``` + +### Token Authentication + +```csharp +opts.UseNats("nats://localhost:4222") + .WithToken("my-secret-token"); +``` + +### NKey Authentication + +```csharp +opts.UseNats("nats://localhost:4222") + .WithNKey("/path/to/nkey.file"); +``` + +### TLS Configuration + +```csharp +opts.UseNats("nats://localhost:4222") + .UseTls(insecureSkipVerify: false); +``` + +## JetStream Configuration + +### Configuring JetStream Defaults + +```csharp +opts.UseNats("nats://localhost:4222") + .UseJetStream(js => + { + js.MaxDeliver = 5; // Max redelivery attempts + js.AckWait = TimeSpan.FromSeconds(30); + js.DuplicateWindow = TimeSpan.FromMinutes(2); + }); +``` + +### Defining Streams + +#### Work Queue Stream (Retention by Interest) + +```csharp +opts.UseNats("nats://localhost:4222") + .DefineWorkQueueStream("ORDERS", "orders.>"); +``` + +#### Work Queue with Additional Configuration + +```csharp +opts.UseNats("nats://localhost:4222") + .DefineWorkQueueStream("ORDERS", + stream => stream.EnableScheduledDelivery(), + "orders.>"); +``` + +#### Custom Stream Configuration + +```csharp +opts.UseNats("nats://localhost:4222") + .DefineStream("EVENTS", stream => + { + stream.WithSubjects("events.>") + .WithLimits(maxMessages: 1_000_000, maxAge: TimeSpan.FromDays(7)) + .WithReplicas(3) + .EnableScheduledDelivery(); + }); +``` + +#### Log Stream (Time-Based Retention) + +```csharp +opts.UseNats("nats://localhost:4222") + .DefineLogStream("LOGS", TimeSpan.FromDays(30), "logs.>"); +``` + +#### Replicated Stream (High Availability) + +```csharp +opts.UseNats("nats://localhost:4222") + .DefineReplicatedStream("CRITICAL", replicas: 3, "critical.>"); +``` + +### JetStream Domain + +For multi-tenant or leaf node configurations: + +```csharp +opts.UseNats("nats://localhost:4222") + .UseJetStreamDomain("my-domain"); +``` + +## Listening to Messages + +### Inline Processing + +Messages are processed immediately on the NATS subscription thread: + +```csharp +opts.ListenToNatsSubject("orders.received") + .ProcessInline(); +``` + +### Buffered Processing + +Messages are queued in memory and processed by worker threads: + +```csharp +opts.ListenToNatsSubject("orders.received") + .BufferedInMemory(); +``` + +### JetStream Consumer + +```csharp +opts.ListenToNatsSubject("orders.received") + .UseJetStream("ORDERS", "my-consumer"); +``` + +### Named Endpoints + +```csharp +opts.ListenToNatsSubject("orders.received") + .Named("orders-listener"); +``` + +## Publishing Messages + +### To a Specific Subject + +```csharp +opts.PublishMessage() + .ToNatsSubject("orders.created"); +``` + +### All Messages to a Subject + +```csharp +opts.PublishAllMessages() + .ToNatsSubject("events"); +``` + +### Inline Sending + +Send messages synchronously without buffering: + +```csharp +opts.PublishAllMessages() + .ToNatsSubject("orders") + .SendInline(); +``` + +## Scheduled Message Delivery + +NATS Server 2.12+ supports native scheduled message delivery. When enabled, Wolverine uses NATS headers for scheduling instead of database persistence. + +### Requirements + +1. NATS Server version >= 2.12 +2. Stream configured with `EnableScheduledDelivery()` + +### Configuration + +```csharp +opts.UseNats("nats://localhost:4222") + .UseJetStream() + .DefineWorkQueueStream("ORDERS", + s => s.EnableScheduledDelivery(), + "orders.>"); +``` + +### How It Works + +When conditions are met, scheduled messages use NATS headers: +- `Nats-Schedule: @at ` +- `Nats-Schedule-Target: ` + +The transport automatically detects server version at startup. + +### Fallback Behavior + +When native scheduled send is not available (server < 2.12 or stream not configured), Wolverine falls back to its database-backed scheduled message persistence. + +## Multi-Tenancy + +NATS transport supports subject-based tenant isolation. + +### Basic Multi-Tenancy + +```csharp +opts.UseNats("nats://localhost:4222") + .ConfigureMultiTenancy(TenantedIdBehavior.RequireTenantId) + .AddTenant("tenant-a") + .AddTenant("tenant-b"); +``` + +### Tenant Behavior Options + +- `RequireTenantId`: Throws if tenant ID is missing +- `FallbackToDefault`: Uses base subject if tenant ID is missing + +### Custom Subject Mapper + +```csharp +public class MyTenantMapper : ITenantSubjectMapper +{ + public string MapSubjectForTenant(string baseSubject, string tenantId) + => $"{tenantId}.{baseSubject}"; + + public string? ExtractTenantId(string subject) + => subject.Split('.').FirstOrDefault(); + + public string GetSubscriptionPattern(string baseSubject) + => $"*.{baseSubject}"; +} + +opts.UseNats("nats://localhost:4222") + .UseTenantSubjectMapper(new MyTenantMapper()); +``` + +## Request-Reply + +Wolverine's request-reply pattern works with NATS: + +```csharp +// Send and wait for response +var response = await bus.InvokeAsync(new CreateOrder(...)); +``` + +The response endpoint always uses Core NATS for low-latency replies, even when the main endpoints use JetStream. + +## Error Handling + +### JetStream + +- **Retry**: Message is requeued via `NakAsync()` with optional delay +- **Dead Letter**: Message is terminated via `AckTerminateAsync()` + +### Core NATS + +- **Retry**: Message is republished to the subject +- **Dead Letter**: Handled by Wolverine's error handling policies + +## Auto-Provisioning + +Enable automatic creation of streams and consumers: + +```csharp +opts.UseNats("nats://localhost:4222") + .AutoProvision(); +``` + +Or use resource setup on startup: + +```csharp +opts.Services.AddResourceSetupOnStartup(); +``` + +## Subject Prefix + +Add a prefix to all NATS subjects: + +```csharp +opts.UseNats("nats://localhost:4222") + .WithSubjectPrefix("myapp"); + +// Subject "orders" becomes "myapp.orders" +``` + +## Complete Example + +```csharp +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseNats("nats://localhost:4222") + .AutoProvision() + .WithCredentials("user", "pass") + .UseJetStream(js => + { + js.MaxDeliver = 5; + js.AckWait = TimeSpan.FromSeconds(30); + }) + .DefineWorkQueueStream("ORDERS", + s => s.EnableScheduledDelivery(), + "orders.>"); + + // Listen to orders with JetStream durability + opts.ListenToNatsSubject("orders.received") + .UseJetStream("ORDERS", "order-processor") + .Named("order-listener"); + + // Publish order events + opts.PublishMessage() + .ToNatsSubject("orders.created"); + + opts.PublishMessage() + .ToNatsSubject("orders.shipped"); + + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); +``` + +## Testing + +To run tests locally: + +```bash +# Start NATS with JetStream +docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:latest --jetstream -m 8222 + +# For scheduled delivery tests, use NATS 2.12+ +docker run -d --name nats -p 4222:4222 -p 8222:8222 nats:2.12-alpine --jetstream -m 8222 +``` diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/BasicTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/BasicTests.cs new file mode 100644 index 000000000..9551c3296 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/BasicTests.cs @@ -0,0 +1,19 @@ +using Wolverine.Nats.Configuration; +using Xunit; +using FluentAssertions; + +namespace Wolverine.Nats.Tests; + +public class BasicTests +{ + [Fact] + public void Transport_configuration_has_sensible_defaults() + { + var config = new NatsTransportConfiguration(); + + config.ConnectionString.Should().Be("nats://localhost:4222"); + config.ConnectTimeout.Should().Be(TimeSpan.FromSeconds(10)); + config.RequestTimeout.Should().Be(TimeSpan.FromSeconds(30)); + config.EnableJetStream.Should().BeTrue(); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/Helpers/TestCollections.cs b/src/Transports/NATS/Wolverine.Nats.Tests/Helpers/TestCollections.cs new file mode 100644 index 000000000..462a3e1b9 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/Helpers/TestCollections.cs @@ -0,0 +1,18 @@ +using Xunit; + +namespace Wolverine.Nats.Tests.Helpers; + +[CollectionDefinition("NATS Integration Tests")] +public class NatsIntegrationTestCollection : ICollectionFixture +{ +} + +[CollectionDefinition("NATS MultiTenancy Tests")] +public class NatsMultiTenancyTestCollection : ICollectionFixture +{ +} + +[CollectionDefinition("NATS Compliance", DisableParallelization = true)] +public class NatsComplianceTestCollection : ICollectionFixture +{ +} \ No newline at end of file diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/Helpers/XunitLogger.cs b/src/Transports/NATS/Wolverine.Nats.Tests/Helpers/XunitLogger.cs new file mode 100644 index 000000000..6e77be1a0 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/Helpers/XunitLogger.cs @@ -0,0 +1,82 @@ +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace Wolverine.Nats.Tests.Helpers; + +/// +/// Logger provider that writes to xUnit test output +/// +public class XunitLoggerProvider : ILoggerProvider +{ + private readonly ITestOutputHelper _output; + + public XunitLoggerProvider(ITestOutputHelper output) + { + _output = output; + } + + public ILogger CreateLogger(string categoryName) + { + return new XunitLogger(categoryName, _output); + } + + public void Dispose() { } +} + +/// +/// Logger that writes to xUnit test output +/// +public class XunitLogger : ILogger +{ + private readonly string _categoryName; + private readonly ITestOutputHelper _output; + + public XunitLogger(string categoryName, ITestOutputHelper output) + { + _categoryName = categoryName; + _output = output; + } + + public IDisposable? BeginScope(TState state) where TState : notnull + { + return null; + } + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + var timestamp = DateTime.Now.ToString("HH:mm:ss.fff"); + var message = formatter(state, exception); + + // Short category name for readability + var shortCategory = _categoryName.Contains('.') + ? _categoryName.Substring(_categoryName.LastIndexOf('.') + 1) + : _categoryName; + + _output.WriteLine($"[{timestamp}] [{logLevel,-5}] {shortCategory}: {message}"); + + if (exception != null) + { + _output.WriteLine($"Exception: {exception.GetType().Name}: {exception.Message}"); + _output.WriteLine(exception.StackTrace); + } + } +} + +/// +/// Extension methods for configuring xUnit logging +/// +public static class XunitLoggingExtensions +{ + /// + /// Adds xUnit test output logging to the logging builder + /// + public static ILoggingBuilder AddXunitLogging(this ILoggingBuilder builder, ITestOutputHelper output) + { + builder.ClearProviders(); + builder.AddProvider(new XunitLoggerProvider(output)); + builder.SetMinimumLevel(LogLevel.Debug); + return builder; + } +} \ No newline at end of file diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/MultiTenancyTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/MultiTenancyTests.cs new file mode 100644 index 000000000..aea2a9c5e --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/MultiTenancyTests.cs @@ -0,0 +1,616 @@ +using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using NSubstitute; +using Shouldly; +using Wolverine.Nats.Internal; +using Wolverine.Nats.Tests.Helpers; +using Wolverine.Tracking; +using Wolverine.Transports.Sending; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.Nats.Tests; + +#region Unit Tests - No NATS Infrastructure Required + +/// +/// Unit tests for the DefaultTenantSubjectMapper - no NATS connection required +/// +public class DefaultTenantSubjectMapperTests +{ + [Fact] + public void maps_subject_with_tenant_prefix() + { + var mapper = new DefaultTenantSubjectMapper(); + + mapper.MapSubject("orders", "tenant1").ShouldBe("tenant1.orders"); + mapper.MapSubject("orders", "tenant2").ShouldBe("tenant2.orders"); + } + + [Fact] + public void maps_nested_subject_with_tenant_prefix() + { + var mapper = new DefaultTenantSubjectMapper(); + + mapper.MapSubject("orders.created", "tenant1").ShouldBe("tenant1.orders.created"); + mapper.MapSubject("orders.shipped.international", "tenant2").ShouldBe("tenant2.orders.shipped.international"); + } + + [Fact] + public void returns_original_subject_when_tenant_is_empty() + { + var mapper = new DefaultTenantSubjectMapper(); + + mapper.MapSubject("orders", "").ShouldBe("orders"); + mapper.MapSubject("orders", null!).ShouldBe("orders"); + } + + [Fact] + public void extracts_tenant_id_from_subject() + { + var mapper = new DefaultTenantSubjectMapper(); + + mapper.ExtractTenantId("tenant1.orders").ShouldBe("tenant1"); + mapper.ExtractTenantId("tenant2.orders").ShouldBe("tenant2"); + } + + [Fact] + public void extracts_tenant_id_from_nested_subject() + { + var mapper = new DefaultTenantSubjectMapper(); + + mapper.ExtractTenantId("tenant1.orders.created").ShouldBe("tenant1"); + } + + [Fact] + public void returns_null_when_no_tenant_in_subject() + { + var mapper = new DefaultTenantSubjectMapper(); + + mapper.ExtractTenantId("orders").ShouldBeNull(); + mapper.ExtractTenantId("").ShouldBeNull(); + mapper.ExtractTenantId(null!).ShouldBeNull(); + } + + [Fact] + public void generates_wildcard_subscription_pattern() + { + var mapper = new DefaultTenantSubjectMapper(); + + mapper.GetSubscriptionPattern("orders").ShouldBe("*.orders"); + mapper.GetSubscriptionPattern("orders.created").ShouldBe("*.orders.created"); + } + + [Fact] + public void normalizes_slashes_in_tenant_id() + { + var mapper = new DefaultTenantSubjectMapper(); + mapper.MapSubject("orders", "org/team").ShouldBe("org.team.orders"); + } + + [Fact] + public void custom_separator_works() + { + var mapper = new DefaultTenantSubjectMapper(separator: "-"); + + mapper.MapSubject("orders", "tenant1").ShouldBe("tenant1-orders"); + mapper.ExtractTenantId("tenant1-orders").ShouldBe("tenant1"); + mapper.GetSubscriptionPattern("orders").ShouldBe("*-orders"); + } +} + +/// +/// Unit tests for TenantedSender routing logic - uses mocked senders +/// Following the pattern from Wolverine's CoreTests/Transports/Sending/TenantedSenderTests.cs +/// +public class TenantedSenderRoutingTests +{ + private readonly ISender _defaultSender = Substitute.For(); + private readonly ISender _tenant1Sender = Substitute.For(); + private readonly ISender _tenant2Sender = Substitute.For(); + private readonly ISender _tenant3Sender = Substitute.For(); + + public TenantedSenderRoutingTests() + { + _defaultSender.Destination.Returns(new Uri("nats://subject/orders")); + _tenant1Sender.Destination.Returns(new Uri("nats://subject/tenant1.orders")); + _tenant2Sender.Destination.Returns(new Uri("nats://subject/tenant2.orders")); + _tenant3Sender.Destination.Returns(new Uri("nats://subject/tenant3.orders")); + } + + [Fact] + public async Task routes_messages_to_correct_tenant_sender() + { + var tenantedSender = new TenantedSender( + new Uri("nats://subject/orders"), + TenantedIdBehavior.TenantIdRequired, + null); + + tenantedSender.RegisterSender("tenant1", _tenant1Sender); + tenantedSender.RegisterSender("tenant2", _tenant2Sender); + tenantedSender.RegisterSender("tenant3", _tenant3Sender); + + var e1 = new Envelope { TenantId = "tenant1" }; + var e2 = new Envelope { TenantId = "tenant2" }; + var e3 = new Envelope { TenantId = "tenant3" }; + + await tenantedSender.SendAsync(e1); + await tenantedSender.SendAsync(e2); + await tenantedSender.SendAsync(e3); + + await _tenant1Sender.Received(1).SendAsync(e1); + await _tenant2Sender.Received(1).SendAsync(e2); + await _tenant3Sender.Received(1).SendAsync(e3); + } + + [Fact] + public void throws_when_default_sender_required_but_not_provided() + { + Should.Throw(() => + { + _ = new TenantedSender( + new Uri("nats://subject/orders"), + TenantedIdBehavior.FallbackToDefault, + null); + }); + } + + [Fact] + public async Task throws_when_tenant_id_required_but_missing() + { + var tenantedSender = new TenantedSender( + new Uri("nats://subject/orders"), + TenantedIdBehavior.TenantIdRequired, + null); + + tenantedSender.RegisterSender("tenant1", _tenant1Sender); + + var envelope = new Envelope { TenantId = null }; + + await Should.ThrowAsync(async () => + { + await tenantedSender.SendAsync(envelope); + }); + } + + [Fact] + public async Task throws_when_tenant_id_required_but_empty() + { + var tenantedSender = new TenantedSender( + new Uri("nats://subject/orders"), + TenantedIdBehavior.TenantIdRequired, + null); + + tenantedSender.RegisterSender("tenant1", _tenant1Sender); + + var envelope = new Envelope { TenantId = string.Empty }; + + await Should.ThrowAsync(async () => + { + await tenantedSender.SendAsync(envelope); + }); + } + + [Fact] + public async Task falls_back_to_default_sender_when_tenant_missing() + { + var tenantedSender = new TenantedSender( + new Uri("nats://subject/orders"), + TenantedIdBehavior.FallbackToDefault, + _defaultSender); + + tenantedSender.RegisterSender("tenant1", _tenant1Sender); + tenantedSender.RegisterSender("tenant2", _tenant2Sender); + + var e1 = new Envelope { TenantId = null }; + var e2 = new Envelope { TenantId = "tenant2" }; + var e3 = new Envelope { TenantId = string.Empty }; + + await tenantedSender.SendAsync(e1); + await tenantedSender.SendAsync(e2); + await tenantedSender.SendAsync(e3); + + await _defaultSender.Received(1).SendAsync(e1); + await _tenant2Sender.Received(1).SendAsync(e2); + await _defaultSender.Received(1).SendAsync(e3); + } + + [Fact] + public async Task falls_back_to_default_for_unknown_tenant() + { + var tenantedSender = new TenantedSender( + new Uri("nats://subject/orders"), + TenantedIdBehavior.FallbackToDefault, + _defaultSender); + + tenantedSender.RegisterSender("tenant1", _tenant1Sender); + + var envelope = new Envelope { TenantId = "unknown_tenant" }; + + await tenantedSender.SendAsync(envelope); + + await _defaultSender.Received(1).SendAsync(envelope); + } + + [Fact] + public async Task throws_for_unknown_tenant_when_required() + { + var tenantedSender = new TenantedSender( + new Uri("nats://subject/orders"), + TenantedIdBehavior.TenantIdRequired, + null); + + tenantedSender.RegisterSender("tenant1", _tenant1Sender); + + var envelope = new Envelope { TenantId = "unknown_tenant" }; + + await Should.ThrowAsync(async () => + { + await tenantedSender.SendAsync(envelope); + }); + } +} + +/// +/// Unit tests for NatsTenant configuration +/// +public class NatsTenantTests +{ + [Fact] + public void creates_tenant_with_id() + { + var tenant = new NatsTenant("tenant1"); + + tenant.TenantId.ShouldBe("tenant1"); + tenant.SubjectMapper.ShouldBeNull(); + tenant.ConnectionString.ShouldBeNull(); + } + + [Fact] + public void throws_when_tenant_id_is_null() + { + Should.Throw(() => new NatsTenant(null!)); + } + + [Fact] + public void can_set_custom_subject_mapper() + { + var tenant = new NatsTenant("tenant1"); + var mapper = new DefaultTenantSubjectMapper(separator: "-"); + + tenant.SubjectMapper = mapper; + + tenant.SubjectMapper.ShouldBe(mapper); + } +} + +#endregion + +#region Integration Tests - Require NATS Connection + +/// +/// Integration tests for NATS multi-tenancy using the dual-host pattern +/// (sender and receiver as separate hosts) for reliable tracking. +/// +[Collection("NATS Integration")] +[Trait("Category", "Integration")] +public class MultiTenancyIntegrationTests : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost? _sender; + private IHost? _receiver; + private string _baseSubject = null!; + private string? _natsUrl; + + public MultiTenancyIntegrationTests(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + _natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + _baseSubject = $"test.multitenancy.{Guid.NewGuid():N}"; + + _output.WriteLine($"Using NATS URL: {_natsUrl}"); + _output.WriteLine($"Base subject: {_baseSubject}"); + + if (!await IsNatsAvailable(_natsUrl)) + { + _output.WriteLine("NATS not available, skipping test"); + return; + } + + _sender = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "MultiTenancySender"; + + opts.UseNats(_natsUrl) + .ConfigureMultiTenancy(TenantedIdBehavior.FallbackToDefault) + .AddTenant("tenant1") + .AddTenant("tenant2"); + + opts.PublishMessage() + .ToNatsSubject(_baseSubject); + }) + .StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "MultiTenancyReceiver"; + + opts.UseNats(_natsUrl) + .ConfigureMultiTenancy(TenantedIdBehavior.FallbackToDefault) + .AddTenant("tenant1") + .AddTenant("tenant2"); + + opts.ListenToNatsSubject(_baseSubject); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + if (_sender != null) + { + await _sender.StopAsync(); + _sender.Dispose(); + } + if (_receiver != null) + { + await _receiver.StopAsync(); + _receiver.Dispose(); + } + } + + [Fact] + public async Task messages_are_routed_to_tenant_specific_subjects() + { + if (_sender == null || _receiver == null) + { + _output.WriteLine("NATS not available, skipping test"); + return; + } + + var msg1 = new TenantTestMessage(Guid.NewGuid(), "Message for tenant 1"); + + _output.WriteLine($"Sending message {msg1.Id} with tenant1"); + + var session = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(msg1, new DeliveryOptions { TenantId = "tenant1" }); + + var sentMessage = session.Sent.SingleMessage(); + sentMessage.Id.ShouldBe(msg1.Id); + + var receivedEnvelope = session.Received.SingleEnvelope(); + receivedEnvelope.TenantId.ShouldBe("tenant1"); + receivedEnvelope.Message.ShouldBeOfType().Id.ShouldBe(msg1.Id); + } + + [Fact] + public async Task different_tenants_route_to_different_subjects() + { + if (_sender == null || _receiver == null) + { + _output.WriteLine("NATS not available, skipping test"); + return; + } + + var msg1 = new TenantTestMessage(Guid.NewGuid(), "Message for tenant 1"); + var msg2 = new TenantTestMessage(Guid.NewGuid(), "Message for tenant 2"); + + _output.WriteLine($"Sending message {msg1.Id} with tenant1"); + _output.WriteLine($"Sending message {msg2.Id} with tenant2"); + + var session1 = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(msg1, new DeliveryOptions { TenantId = "tenant1" }); + + var received1 = session1.Received.SingleEnvelope(); + received1.TenantId.ShouldBe("tenant1"); + received1.Message.ShouldBeOfType().Id.ShouldBe(msg1.Id); + + var session2 = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(msg2, new DeliveryOptions { TenantId = "tenant2" }); + + var received2 = session2.Received.SingleEnvelope(); + received2.TenantId.ShouldBe("tenant2"); + received2.Message.ShouldBeOfType().Id.ShouldBe(msg2.Id); + } + + [Fact] + public async Task fallback_to_default_sends_to_base_subject() + { + if (_sender == null || _receiver == null) + { + _output.WriteLine("NATS not available, skipping test"); + return; + } + + var msgWithTenant = new TenantTestMessage(Guid.NewGuid(), "With tenant"); + var msgWithoutTenant = new TenantTestMessage(Guid.NewGuid(), "Without tenant"); + + var session1 = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(msgWithTenant, new DeliveryOptions { TenantId = "tenant1" }); + + var tenantReceived = session1.Received.SingleEnvelope(); + tenantReceived.TenantId.ShouldBe("tenant1"); + + var session2 = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(msgWithoutTenant); + + var defaultReceived = session2.Received.SingleEnvelope(); + defaultReceived.TenantId.ShouldBeNull(); + } + + private async Task IsNatsAvailable(string natsUrl) + { + try + { + using var testHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseNats(natsUrl); + }) + .StartAsync(); + + await testHost.StopAsync(); + return true; + } + catch + { + return false; + } + } +} + +/// +/// Tests for TenantIdRequired behavior +/// +[Collection("NATS Integration")] +[Trait("Category", "Integration")] +public class TenantIdRequiredBehaviorTests : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost? _sender; + private IHost? _receiver; + private string _baseSubject = null!; + private string? _natsUrl; + + public TenantIdRequiredBehaviorTests(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + _natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + _baseSubject = $"test.required.{Guid.NewGuid():N}"; + + if (!await IsNatsAvailable(_natsUrl)) + { + _output.WriteLine("NATS not available, skipping test"); + return; + } + + _sender = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "TenantRequiredSender"; + + opts.UseNats(_natsUrl) + .ConfigureMultiTenancy(TenantedIdBehavior.TenantIdRequired) + .AddTenant("tenant1"); + + opts.PublishMessage() + .ToNatsSubject(_baseSubject); + }) + .StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "TenantRequiredReceiver"; + + opts.UseNats(_natsUrl) + .ConfigureMultiTenancy(TenantedIdBehavior.TenantIdRequired) + .AddTenant("tenant1"); + + opts.ListenToNatsSubject(_baseSubject); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + if (_sender != null) + { + await _sender.StopAsync(); + _sender.Dispose(); + } + if (_receiver != null) + { + await _receiver.StopAsync(); + _receiver.Dispose(); + } + } + + [Fact] + public async Task message_with_tenant_id_is_sent_and_received_successfully() + { + if (_sender == null || _receiver == null) + { + _output.WriteLine("NATS not available, skipping test"); + return; + } + + var msg = new TenantTestMessage(Guid.NewGuid(), "With tenant"); + + var session = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(msg, new DeliveryOptions { TenantId = "tenant1" }); + + var received = session.Received.SingleEnvelope(); + received.TenantId.ShouldBe("tenant1"); + received.Message.ShouldBeOfType().Id.ShouldBe(msg.Id); + } + + private async Task IsNatsAvailable(string natsUrl) + { + try + { + using var testHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseNats(natsUrl); + }) + .StartAsync(); + + await testHost.StopAsync(); + return true; + } + catch + { + return false; + } + } +} + +#endregion + +#region Supporting Types + +public record TenantTestMessage(Guid Id, string Content); + +public class TenantTestMessageHandler +{ + public void Handle(TenantTestMessage message) + { + } +} + +#endregion diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NatsIntegrationTestCollection.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NatsIntegrationTestCollection.cs new file mode 100644 index 000000000..eb06efc0d --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/NatsIntegrationTestCollection.cs @@ -0,0 +1,6 @@ +using Xunit; + +namespace Wolverine.Nats.Tests; + +[CollectionDefinition("NATS Integration", DisableParallelization = true)] +public class NatsIntegrationTestCollection; diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportComplianceTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportComplianceTests.cs new file mode 100644 index 000000000..83503ba8c --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportComplianceTests.cs @@ -0,0 +1,147 @@ +#if true + +using JasperFx.Core; +using Wolverine.ComplianceTests.Compliance; +using Xunit; + +namespace Wolverine.Nats.Tests; + +public class InlineNatsTransportFixture : TransportComplianceFixture, IAsyncLifetime +{ + public static int Counter = 0; + + public InlineNatsTransportFixture() : base(new Uri("nats://subject/compliance.receiver"), 60) + { + } + + public async Task InitializeAsync() + { + var number = ++Counter; + var receiverSubject = $"compliance.receiver.inline.{number}"; + var senderSubject = $"compliance.sender.inline.{number}"; + + OutboundAddress = new Uri($"nats://subject/{receiverSubject}"); + + var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + + await SenderIs(opts => + { + opts.UseNats(natsUrl).AutoProvision(); + opts.ListenToNatsSubject(senderSubject).ProcessInline(); + opts.PublishAllMessages().ToNatsSubject(receiverSubject).SendInline(); + }); + + await ReceiverIs(opts => + { + opts.UseNats(natsUrl).AutoProvision(); + opts.ListenToNatsSubject(receiverSubject).Named("receiver").ProcessInline(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +[Collection("NATS Compliance")] +public class InlineNatsTransportComplianceTests : TransportCompliance; + +public class BufferedNatsTransportFixture : TransportComplianceFixture, IAsyncLifetime +{ + public static int Counter = 0; + + public BufferedNatsTransportFixture() : base(new Uri("nats://subject/compliance.receiver"), 60) + { + } + + public async Task InitializeAsync() + { + var number = ++Counter; + var receiverSubject = $"compliance.receiver.buffered.{number}"; + var senderSubject = $"compliance.sender.buffered.{number}"; + + OutboundAddress = new Uri($"nats://subject/{receiverSubject}"); + + var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + + await SenderIs(opts => + { + opts.UseNats(natsUrl).AutoProvision(); + opts.ListenToNatsSubject(senderSubject).BufferedInMemory(); + opts.PublishAllMessages().ToNatsSubject(receiverSubject).BufferedInMemory(); + }); + + await ReceiverIs(opts => + { + opts.UseNats(natsUrl).AutoProvision(); + opts.ListenToNatsSubject(receiverSubject).Named("receiver").BufferedInMemory(); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } +} + +[Collection("NATS Compliance")] +public class BufferedNatsTransportComplianceTests : TransportCompliance; + +public class JetStreamNatsTransportFixture : TransportComplianceFixture, IAsyncLifetime +{ + public static int Counter = 0; + + public JetStreamNatsTransportFixture() : base(new Uri("nats://subject/compliance.receiver"), 120) + { + } + + public async Task InitializeAsync() + { + var number = ++Counter; + var streamName = $"COMPLIANCE_{number}"; + var receiverSubject = $"compliance.receiver.js.{number}"; + var senderSubject = $"compliance.sender.js.{number}"; + + OutboundAddress = new Uri($"nats://subject/{receiverSubject}"); + + var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + + await SenderIs(opts => + { + opts.UseNats(natsUrl) + .AutoProvision() + .UseJetStream(js => js.MaxDeliver = 5) + .DefineWorkQueueStream(streamName, s => s.EnableScheduledDelivery(), $"compliance.*.js.{number}"); + + opts.ListenToNatsSubject(senderSubject) + .UseJetStream(streamName, $"sender-consumer-{number}"); + + opts.PublishAllMessages().ToNatsSubject(receiverSubject); + }); + + await ReceiverIs(opts => + { + opts.UseNats(natsUrl) + .AutoProvision() + .UseJetStream(js => js.MaxDeliver = 5) + .DefineWorkQueueStream(streamName, s => s.EnableScheduledDelivery(), $"compliance.*.js.{number}"); + + opts.ListenToNatsSubject(receiverSubject) + .Named("receiver") + .UseJetStream(streamName, $"receiver-consumer-{number}"); + }); + } + + public new Task DisposeAsync() + { + return Task.CompletedTask; + } + + +} + +[Collection("NATS Compliance")] +public class JetStreamNatsTransportComplianceTests : TransportCompliance; + +#endif diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportIntegrationTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportIntegrationTests.cs new file mode 100644 index 000000000..a8dfb8690 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/NatsTransportIntegrationTests.cs @@ -0,0 +1,215 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using JasperFx.Core; +using Wolverine.Nats.Internal; +using Wolverine.Nats.Tests.Helpers; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; +using FluentAssertions; + +namespace Wolverine.Nats.Tests; + +[Collection("NATS Integration Tests")] +[Trait("Category", "Integration")] +public class NatsTransportIntegrationTests : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost? _sender; + private IHost? _receiver; + private static int _counter = 0; + private string _receiverSubject = ""; + + public NatsTransportIntegrationTests(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + var natsUrl = Environment.GetEnvironmentVariable("NATS_URL"); + + if (string.IsNullOrEmpty(natsUrl)) + { + if (await IsNatsAvailable("nats://localhost:4222")) + { + natsUrl = "nats://localhost:4222"; + } + return; + + } + + if (!await IsNatsAvailable(natsUrl)) + { + return; + } + + var number = ++_counter; + _receiverSubject = $"test.receiver.{number}"; + + _sender = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "Sender"; + opts.UseNats(natsUrl).AutoProvision(); + opts.PublishAllMessages().ToNatsSubject(_receiverSubject); + }) + .StartAsync(); + + _receiver = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "Receiver"; + opts.UseNats(natsUrl).AutoProvision(); + opts.ListenToNatsSubject(_receiverSubject).Named("receiver"); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + if (_sender != null) + await _sender.StopAsync(); + if (_receiver != null) + await _receiver.StopAsync(); + _sender?.Dispose(); + _receiver?.Dispose(); + } + + [Fact] + public async Task send_message_to_and_receive_through_nats() + { + if (_sender == null || _receiver == null) + return; // Skip if NATS not available + + var message = new TestMessage(Guid.NewGuid(), "Hello NATS!"); + + var tracked = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(message); + + tracked.Sent.SingleMessage().Should().BeEquivalentTo(message); + + tracked.Received.SingleMessage().Should().BeEquivalentTo(message); + } + + [Fact] + public async Task can_send_and_receive_multiple_messages() + { + if (_sender == null || _receiver == null) + return; + + var messages = new[] + { + new TestMessage(Guid.NewGuid(), "Message 1"), + new TestMessage(Guid.NewGuid(), "Message 2"), + new TestMessage(Guid.NewGuid(), "Message 3") + }; + + foreach (var message in messages) + { + var tracked = await _sender + .TrackActivity() + .AlsoTrack(_receiver) + .Timeout(30.Seconds()) + .SendMessageAndWaitAsync(message); + + tracked.Sent.SingleMessage().Should().BeEquivalentTo(message); + + tracked.Received.SingleMessage().Should().BeEquivalentTo(message); + } + } + + [Fact] + public void nats_transport_is_registered() + { + if (_sender == null) + return; + + var runtime = _sender.Services.GetRequiredService(); + var transport = runtime.Options.Transports.GetOrCreate(); + + transport.Should().NotBeNull(); + transport.Protocol.Should().Be("nats"); + } + + [Fact] + public void endpoints_are_configured_correctly() + { + if (_receiver == null) + return; + + var runtime = _receiver.Services.GetRequiredService(); + var transport = runtime.Options.Transports.GetOrCreate(); + + var endpointUri = new Uri($"nats://subject/{_receiverSubject}"); + var endpoint = transport.TryGetEndpoint(endpointUri); + + endpoint.Should().NotBeNull(); + endpoint.Should().BeOfType(); + + var natsEndpoint = (NatsEndpoint)endpoint!; + natsEndpoint.Subject.Should().Be(_receiverSubject); + natsEndpoint.EndpointName.Should().Be("receiver"); + } + + [Fact] + public void server_version_is_detected_and_scheduled_send_support_is_determined() + { + if (_sender == null) + return; + + var runtime = _sender.Services.GetRequiredService(); + var transport = runtime.Options.Transports.GetOrCreate(); + + transport.Connection.ServerInfo.Should().NotBeNull(); + transport.Connection.ServerInfo!.Version.Should().NotBeNullOrEmpty(); + + _output.WriteLine($"NATS Server Version: {transport.Connection.ServerInfo.Version}"); + _output.WriteLine($"ServerSupportsScheduledSend: {transport.ServerSupportsScheduledSend}"); + + var versionString = transport.Connection.ServerInfo.Version.Split('-')[0]; + if (Version.TryParse(versionString, out var serverVersion)) + { + var minVersion = new Version(2, 12, 0); + var expectedSupport = serverVersion >= minVersion; + + transport.ServerSupportsScheduledSend.Should().Be(expectedSupport, + $"Server version {serverVersion} should {(expectedSupport ? "" : "not ")}support scheduled send (min: {minVersion})"); + } + } + + private async Task IsNatsAvailable(string natsUrl) + { + try + { + using var testHost = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseNats(natsUrl); + }) + .StartAsync(); + + await testHost.StopAsync(); + return true; + } + catch + { + return false; + } + } +} + +public record TestMessage(Guid Id, string Text); + +public class TestMessageHandler +{ + public void Handle(TestMessage message) + { + } +} diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/NoParallelization.cs b/src/Transports/NATS/Wolverine.Nats.Tests/NoParallelization.cs new file mode 100644 index 000000000..217120083 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/NoParallelization.cs @@ -0,0 +1,3 @@ +using Xunit; + +[assembly: CollectionBehavior(DisableTestParallelization = true)] diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/RequestReplyTests.cs b/src/Transports/NATS/Wolverine.Nats.Tests/RequestReplyTests.cs new file mode 100644 index 000000000..68944547c --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/RequestReplyTests.cs @@ -0,0 +1,132 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using NATS.Client.Core; +using Wolverine.Nats.Tests.Helpers; +using Wolverine.Runtime.Routing; +using Wolverine.Tracking; +using Xunit; +using Xunit.Abstractions; + +namespace Wolverine.Nats.Tests; + +[Collection("NATS Integration Tests")] +[Trait("Category", "Integration")] +public class RequestReplyTests : IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private IHost? _host; + + public RequestReplyTests(ITestOutputHelper output) + { + _output = output; + } + + public async Task InitializeAsync() + { + var natsUrl = Environment.GetEnvironmentVariable("NATS_URL") ?? "nats://localhost:4222"; + + if (!await IsNatsServerAvailable(natsUrl)) + { + _output.WriteLine($"NATS server not available at {natsUrl}. Skipping integration tests."); + return; + } + + _host = await Host.CreateDefaultBuilder() + .ConfigureLogging(logging => logging.AddXunitLogging(_output)) + .UseWolverine(opts => + { + opts.ServiceName = "RequestReplyTest"; + opts.UseNats(natsUrl); + + // Configure publishing + opts.PublishMessage().ToNatsSubject("ping.request"); + + // Configure listening + opts.ListenToNatsSubject("ping.request"); + }) + .StartAsync(); + } + + private async Task IsNatsServerAvailable(string natsUrl) + { + try + { + await using var connection = new NatsConnection(NatsOpts.Default with { Url = natsUrl }); + await connection.ConnectAsync(); + return connection.ConnectionState == NatsConnectionState.Open; + } + catch + { + return false; + } + } + + public async Task DisposeAsync() + { + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } + } + + [Fact] + public async Task can_send_request_and_receive_reply() + { + // Skip if NATS server not available or host not initialized + if (_host == null) + { + _output.WriteLine("Skipping test - NATS server not available"); + return; + } + + var (session, response) = await _host + .TrackActivity() + .Timeout(TimeSpan.FromSeconds(10)) + .InvokeAndWaitAsync(new PingMessage { Name = "TestPing" }); + + Assert.NotNull(response); + Assert.Equal("Hello TestPing", response.Message); + } + + + [Fact] + public async Task throws_unknown_endpoint_exception_for_invalid_endpoint() + { + // Skip if NATS server not available or host not initialized + if (_host == null) + { + _output.WriteLine("Skipping test - NATS server not available"); + return; + } + + var bus = _host.Services.GetRequiredService(); + + await Assert.ThrowsAsync(async () => + { + await bus.EndpointFor("nats://nonexistent.subject") + .InvokeAsync( + new PingMessage { Name = "NoEndpoint" }, + timeout: TimeSpan.FromSeconds(1) + ); + }); + } +} + +public class PingMessage +{ + public string Name { get; set; } = ""; +} + +public class PongMessage +{ + public string Message { get; set; } = ""; +} + +public class PingHandler +{ + public PongMessage Handle(PingMessage ping) + { + return new PongMessage { Message = $"Hello {ping.Name}" }; + } +} diff --git a/src/Transports/NATS/Wolverine.Nats.Tests/Wolverine.Nats.Tests.csproj b/src/Transports/NATS/Wolverine.Nats.Tests/Wolverine.Nats.Tests.csproj new file mode 100644 index 000000000..17bc6c793 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats.Tests/Wolverine.Nats.Tests.csproj @@ -0,0 +1,36 @@ + + + + false + true + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + + + + + + Servers.cs + + + + diff --git a/src/Transports/NATS/Wolverine.Nats/Configuration/NatsListenerConfiguration.cs b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsListenerConfiguration.cs new file mode 100644 index 000000000..534090bf0 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsListenerConfiguration.cs @@ -0,0 +1,86 @@ +using Wolverine.Configuration; +using Wolverine.Nats.Internal; + +namespace Wolverine.Nats.Configuration; + +public class NatsListenerConfiguration + : ListenerConfiguration +{ + public NatsListenerConfiguration(NatsEndpoint endpoint) + : base(endpoint) { } + + /// + /// Use JetStream for durable messaging + /// + public NatsListenerConfiguration UseJetStream( + string? streamName = null, + string? consumerName = null + ) + { + add(endpoint => + { + endpoint.UseJetStream = true; + endpoint.StreamName = streamName ?? endpoint.Subject.Replace(".", "_").ToUpper(); + endpoint.ConsumerName = consumerName; + }); + + return this; + } + + /// + /// Use a queue group for load balancing (Core NATS only) + /// + public NatsListenerConfiguration UseQueueGroup(string queueGroup) + { + add(endpoint => + { + endpoint.QueueGroup = queueGroup; + }); + + return this; + } + + /// + /// Configure dead letter queue settings for this NATS listener + /// + public NatsListenerConfiguration ConfigureDeadLetterQueue( + int maxDeliveryAttempts, + string? deadLetterSubject = null + ) + { + add(endpoint => + { + endpoint.DeadLetterQueueEnabled = true; + endpoint.DeadLetterSubject = deadLetterSubject; + endpoint.MaxDeliveryAttempts = maxDeliveryAttempts; + }); + + return this; + } + + /// + /// Disable dead letter queue handling for this listener + /// + public NatsListenerConfiguration DisableDeadLetterQueueing() + { + add(endpoint => + { + endpoint.DeadLetterQueueEnabled = false; + }); + + return this; + } + + /// + /// Configure the dead letter subject for failed messages + /// + public NatsListenerConfiguration DeadLetterTo(string deadLetterSubject) + { + add(endpoint => + { + endpoint.DeadLetterSubject = deadLetterSubject; + }); + + return this; + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Configuration/NatsSubscriberConfiguration.cs b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsSubscriberConfiguration.cs new file mode 100644 index 000000000..c62fbb118 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsSubscriberConfiguration.cs @@ -0,0 +1,28 @@ +using Wolverine.Configuration; +using Wolverine.Nats.Internal; + +namespace Wolverine.Nats.Configuration; + +/// +/// Configuration for NATS publishers/subscribers +/// +public class NatsSubscriberConfiguration + : SubscriberConfiguration +{ + public NatsSubscriberConfiguration(NatsEndpoint endpoint) + : base(endpoint) { } + + /// + /// Use JetStream for durable message publishing + /// + public NatsSubscriberConfiguration UseJetStream(string? streamName = null) + { + add(endpoint => + { + endpoint.UseJetStream = true; + endpoint.StreamName = streamName ?? endpoint.Subject.Replace(".", "_").ToUpper(); + }); + + return this; + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Configuration/NatsTransportConfiguration.cs b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsTransportConfiguration.cs new file mode 100644 index 000000000..77d0c90f4 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Configuration/NatsTransportConfiguration.cs @@ -0,0 +1,106 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; + +namespace Wolverine.Nats.Configuration; + +public class NatsTransportConfiguration +{ + public string ConnectionString { get; set; } = "nats://localhost:4222"; + public string? ClientName { get; set; } + public TimeSpan ConnectTimeout { get; set; } = TimeSpan.FromSeconds(10); + public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(30); + + public string? Username { get; set; } + public string? Password { get; set; } + public string? Token { get; set; } + public string? Jwt { get; set; } + public string? NKeySeed { get; set; } + public string? CredentialsFile { get; set; } + public string? NKeyFile { get; set; } + public Func>? AuthCallback { get; set; } + + public bool EnableTls { get; set; } + public TlsMode TlsMode { get; set; } = TlsMode.Auto; + public bool TlsInsecure { get; set; } + public string? ClientCertFile { get; set; } + public string? ClientKeyFile { get; set; } + public string? CaFile { get; set; } + + public bool EnableJetStream { get; set; } = true; + public string? JetStreamDomain { get; set; } + public string? JetStreamApiPrefix { get; set; } + + public bool AutoProvision { get; set; } = true; + public string? DefaultQueueGroup { get; set; } + public bool NormalizeSubjects { get; set; } = true; + public JetStreamDefaults JetStreamDefaults { get; set; } = new(); + + public ITenantIdResolver? TenantIdResolver { get; set; } + public ISubjectResolver? SubjectResolver { get; set; } + public string? TenantSubjectPrefix { get; set; } + public Dictionary Streams { get; set; } = new(); + + internal NatsOpts ToNatsOpts() + { + return NatsOpts.Default with + { + Url = ConnectionString, + Name = ClientName ?? "wolverine-nats", + ConnectTimeout = ConnectTimeout, + CommandTimeout = RequestTimeout, + AuthOpts = new NatsAuthOpts + { + Username = Username, + Password = Password, + Token = Token, + Jwt = Jwt, + Seed = NKeySeed, + CredsFile = CredentialsFile, + NKeyFile = NKeyFile, + AuthCredCallback = AuthCallback + }, + TlsOpts = new NatsTlsOpts + { + Mode = TlsMode, + InsecureSkipVerify = TlsInsecure, + CertFile = ClientCertFile, + KeyFile = ClientKeyFile, + CaFile = CaFile + } + }; + } + + internal NatsJSOpts? ToJetStreamOpts() + { + if (!EnableJetStream) + { + return null; + } + + return new NatsJSOpts(ToNatsOpts(), JetStreamDomain, JetStreamApiPrefix ?? "$JS.API"); + } +} + +public class JetStreamDefaults +{ + public string Retention { get; set; } = "limits"; + public TimeSpan? MaxAge { get; set; } = TimeSpan.FromDays(7); + public long? MaxMessages { get; set; } = 1_000_000; + public long? MaxBytes { get; set; } = 1024 * 1024 * 1024; + public int Replicas { get; set; } = 1; + public string AckPolicy { get; set; } = "explicit"; + public TimeSpan AckWait { get; set; } = TimeSpan.FromSeconds(30); + public int MaxDeliver { get; set; } = 3; + public TimeSpan DuplicateWindow { get; set; } = TimeSpan.FromMinutes(2); +} + +public interface ITenantIdResolver +{ + string? ResolveTenantId(Envelope envelope); +} + +public interface ISubjectResolver +{ + string ResolveSubject(string baseSubject, Envelope envelope); + string? ExtractTenantId(string subject); +} diff --git a/src/Transports/NATS/Wolverine.Nats/Configuration/StreamConfiguration.cs b/src/Transports/NATS/Wolverine.Nats/Configuration/StreamConfiguration.cs new file mode 100644 index 000000000..bf56e52f2 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Configuration/StreamConfiguration.cs @@ -0,0 +1,107 @@ +using NATS.Client.JetStream.Models; + +namespace Wolverine.Nats.Configuration; + +public class StreamConfiguration +{ + public string Name { get; set; } = string.Empty; + public List Subjects { get; set; } = new(); + public StreamConfigRetention Retention { get; set; } = StreamConfigRetention.Limits; + public StreamConfigStorage Storage { get; set; } = StreamConfigStorage.File; + public int? MaxMessages { get; set; } + public long? MaxBytes { get; set; } + public TimeSpan? MaxAge { get; set; } + public int? MaxMessagesPerSubject { get; set; } + public StreamConfigDiscard DiscardPolicy { get; set; } = StreamConfigDiscard.Old; + public int Replicas { get; set; } = 1; + public bool AllowRollup { get; set; } + public bool AllowDirect { get; set; } + public bool DenyDelete { get; set; } + public bool DenyPurge { get; set; } + + /// + /// Enable scheduled message delivery (requires NATS Server 2.12+) + /// Once enabled on a stream, this cannot be disabled. + /// + public bool AllowMsgSchedules { get; set; } + + /// + /// Add a subject to this stream + /// + public StreamConfiguration WithSubject(string subject) + { + if (!Subjects.Contains(subject)) + { + Subjects.Add(subject); + } + return this; + } + + /// + /// Add multiple subjects to this stream + /// + public StreamConfiguration WithSubjects(params string[] subjects) + { + foreach (var subject in subjects) + { + WithSubject(subject); + } + return this; + } + + /// + /// Configure retention limits + /// + public StreamConfiguration WithLimits( + int? maxMessages = null, + long? maxBytes = null, + TimeSpan? maxAge = null + ) + { + Retention = StreamConfigRetention.Limits; + if (maxMessages.HasValue) + { + MaxMessages = maxMessages; + } + + if (maxBytes.HasValue) + { + MaxBytes = maxBytes; + } + + if (maxAge.HasValue) + { + MaxAge = maxAge; + } + + return this; + } + + /// + /// Configure as work queue (retention by interest) + /// + public StreamConfiguration AsWorkQueue() + { + Retention = StreamConfigRetention.Interest; + return this; + } + + /// + /// Configure for high availability + /// + public StreamConfiguration WithReplicas(int replicas) + { + Replicas = replicas; + return this; + } + + /// + /// Enable scheduled message delivery (requires NATS Server 2.12+). + /// Once enabled on a stream, this cannot be disabled. + /// + public StreamConfiguration EnableScheduledDelivery() + { + AllowMsgSchedules = true; + return this; + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Extensions/NatsTransportExtensions.cs b/src/Transports/NATS/Wolverine.Nats/Extensions/NatsTransportExtensions.cs new file mode 100644 index 000000000..8c974d686 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Extensions/NatsTransportExtensions.cs @@ -0,0 +1,139 @@ +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Configuration; +using Wolverine.Configuration; +using Wolverine.Nats.Configuration; +using Wolverine.Nats.Internal; + +namespace Wolverine.Nats; + +public static class NatsTransportExtensions +{ + /// + /// Get access to the NATS transport for advanced configuration + /// + internal static NatsTransport NatsTransport(this WolverineOptions options) + { + return options.Transports.GetOrCreate(); + } + + /// + /// Configure Wolverine to use NATS as a message transport + /// + public static NatsTransportExpression UseNats( + this WolverineOptions options, + string connectionString = "nats://localhost:4222" + ) + { + var transport = options.NatsTransport(); + transport.Configuration.ConnectionString = connectionString; + return new NatsTransportExpression(transport, options); + } + + /// + /// Configure Wolverine to use NATS as a message transport with custom configuration + /// + public static NatsTransportExpression UseNats( + this WolverineOptions options, + Action configure + ) + { + var transport = options.NatsTransport(); + configure(transport.Configuration); + return new NatsTransportExpression(transport, options); + } + + /// + /// Configure Wolverine to use NATS as a message transport using IConfiguration + /// Reads configuration from "Wolverine:Nats" section + /// + public static NatsTransportExpression UseNats( + this WolverineOptions options, + IConfiguration configuration + ) + { + var transport = options.NatsTransport(); + + // First try to bind from Wolverine:Nats section (proper nested configuration) + var wolverineSection = configuration.GetSection("Wolverine:Nats"); + if (wolverineSection.Exists()) + { + wolverineSection.Bind(transport.Configuration); + } + + // Fall back to root-level Nats section for backward compatibility + var natsSection = configuration.GetSection("Nats"); + if (natsSection.Exists() && !wolverineSection.Exists()) + { + natsSection.Bind(transport.Configuration); + } + + // Override with environment variable if set (for containers/cloud deployments) + var envUrl = Environment.GetEnvironmentVariable("NATS_URL"); + if (!string.IsNullOrEmpty(envUrl)) + { + transport.Configuration.ConnectionString = envUrl; + } + + return new NatsTransportExpression(transport, options); + } + + /// + /// Publish messages to a NATS subject + /// + public static NatsSubscriberConfiguration ToNatsSubject( + this IPublishToExpression publishing, + string subject + ) + { + var transports = publishing.As().Parent.Transports; + var transport = transports.GetOrCreate(); + + var endpoint = transport.EndpointForSubject(subject); + + // This is necessary to hook up the subscription rules + publishing.To(endpoint.Uri); + + return new NatsSubscriberConfiguration(endpoint); + } + + /// + /// Publish messages to a NATS subject + /// + public static NatsSubscriberConfiguration PublishToNatsSubject( + this WolverineOptions options, + string subject + ) + { + var transport = options.NatsTransport(); + var endpoint = transport.EndpointForSubject(subject); + + options.PublishMessage().To(endpoint.Uri); + + return new NatsSubscriberConfiguration(endpoint); + } + + /// + /// Listen to messages from a NATS subject + /// + public static NatsListenerConfiguration ListenToNatsSubject( + this WolverineOptions options, + string subject + ) + { + var transport = options.NatsTransport(); + var endpoint = transport.EndpointForSubject(subject); + endpoint.IsListener = true; + + return new NatsListenerConfiguration(endpoint); + } + + /// + /// Access the NATS transport configuration for advanced scenarios. + /// This is useful for adding policies or modifying configuration after initial setup. + /// + public static NatsTransportExpression ConfigureNats(this WolverineOptions options) + { + var transport = options.NatsTransport(); + return new NatsTransportExpression(transport, options); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/ITenantSubjectMapper.cs b/src/Transports/NATS/Wolverine.Nats/ITenantSubjectMapper.cs new file mode 100644 index 000000000..e0b1147f6 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/ITenantSubjectMapper.cs @@ -0,0 +1,31 @@ +namespace Wolverine.Nats; + +/// +/// Interface for mapping tenant IDs to NATS subjects and extracting tenant IDs from subjects. +/// Implement this interface to provide custom tenant-subject mapping strategies. +/// +public interface ITenantSubjectMapper +{ + /// + /// Maps a base subject to a tenant-specific subject. + /// + /// The base subject without tenant information + /// The tenant ID to incorporate into the subject + /// The tenant-specific subject + string MapSubject(string baseSubject, string tenantId); + + /// + /// Extracts the tenant ID from a NATS subject. + /// + /// The full NATS subject that may contain tenant information + /// The tenant ID if found, otherwise null + string? ExtractTenantId(string subject); + + /// + /// Gets the subscription pattern for listening to all tenant subjects for a given base subject. + /// For example, if using prefix-based mapping, this might return "*.orders.>" for base subject "orders.>" + /// + /// The base subject pattern + /// The subscription pattern that matches all tenant variants + string GetSubscriptionPattern(string baseSubject); +} \ No newline at end of file diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsPublisher.cs b/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsPublisher.cs new file mode 100644 index 000000000..795528138 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsPublisher.cs @@ -0,0 +1,65 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core; + +namespace Wolverine.Nats.Internal; + +/// +/// Core NATS publisher for sending messages +/// +internal class CoreNatsPublisher : INatsPublisher +{ + private readonly NatsConnection _connection; + private readonly ILogger _logger; + + public CoreNatsPublisher(NatsConnection connection, ILogger logger) + { + _connection = connection; + _logger = logger; + } + + public async ValueTask PingAsync(CancellationToken cancellation) + { + try + { + var pingSubject = $"_INBOX.wolverine.ping.{Guid.NewGuid():N}"; + await _connection.PublishAsync( + pingSubject, + Array.Empty(), + cancellationToken: cancellation + ); + return _connection.ConnectionState == NatsConnectionState.Open; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to ping NATS endpoint"); + return false; + } + } + + public async ValueTask PublishAsync( + string subject, + byte[] data, + NatsHeaders headers, + string? replyTo, + Envelope envelope, + CancellationToken cancellation + ) + { + await _connection.PublishAsync( + subject, + data, + headers, + replyTo, + cancellationToken: cancellation + ); + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "Message {MessageId} published via Core NATS to {Subject}", + envelope.Id, + subject + ); + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsSubscriber.cs b/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsSubscriber.cs new file mode 100644 index 000000000..1faa0d32e --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/CoreNatsSubscriber.cs @@ -0,0 +1,194 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using Wolverine.Transports; + +namespace Wolverine.Nats.Internal; + +internal class CoreNatsSubscriber : INatsSubscriber +{ + private readonly NatsEndpoint _endpoint; + private readonly NatsConnection _connection; + private readonly ILogger _logger; + private readonly NatsEnvelopeMapper _mapper; + private readonly string _subscriptionPattern; + private readonly List _subscriptions = new(); + private readonly List _consumerTasks = new(); + + public CoreNatsSubscriber( + NatsEndpoint endpoint, + NatsConnection connection, + ILogger logger, + NatsEnvelopeMapper mapper, + string? subscriptionPattern = null + ) + { + _endpoint = endpoint; + _connection = connection; + _logger = logger; + _mapper = mapper; + _subscriptionPattern = subscriptionPattern ?? endpoint.Subject; + } + + public bool SupportsNativeDeadLetterQueue => false; + + public async Task StartAsync( + IListener listener, + IReceiver receiver, + CancellationToken cancellation + ) + { + var patterns = new List(); + + if (_subscriptionPattern != _endpoint.Subject && _subscriptionPattern.StartsWith("*.")) + { + patterns.Add(_subscriptionPattern); + patterns.Add(_endpoint.Subject); + + _logger.LogInformation( + "Multi-tenant subscription: listening to patterns '{WildcardPattern}' and '{BaseSubject}' for subject '{Subject}'", + _subscriptionPattern, + _endpoint.Subject, + _endpoint.Subject + ); + } + else + { + patterns.Add(_subscriptionPattern); + + _logger.LogInformation( + "Starting Core NATS listener for pattern {Pattern} (base subject: {Subject}) with queue group {QueueGroup}", + _subscriptionPattern, + _endpoint.Subject, + _endpoint.QueueGroup ?? "(none)" + ); + } + + foreach (var pattern in patterns) + { + IAsyncDisposable subscription; + + if (!string.IsNullOrEmpty(_endpoint.QueueGroup)) + { + subscription = await _connection.SubscribeCoreAsync( + pattern, + _endpoint.QueueGroup, + cancellationToken: cancellation + ); + } + else + { + subscription = await _connection.SubscribeCoreAsync( + pattern, + cancellationToken: cancellation + ); + } + + _subscriptions.Add(subscription); + + var consumerTask = Task.Run( + async () => + { + try + { + await foreach ( + var msg in ((INatsSub)subscription).Msgs.ReadAllAsync(cancellation) + ) + { + try + { + // Skip messages without data + if (msg.Data == null || msg.Data.Length == 0) + { + continue; + } + + // Skip messages without headers or without message-type header. + // These are typically NATS protocol messages (JetStream acks, etc.) + // that should not be processed by Wolverine. + if (msg.Headers == null || !msg.Headers.ContainsKey("message-type")) + { + _logger.LogDebug( + "Skipping NATS message without message-type header from subject {Subject}. DataLength={DataLength}, HasHeaders={HasHeaders}", + msg.Subject, + msg.Data.Length, + msg.Headers != null + ); + continue; + } + + var envelope = new NatsEnvelope(msg, null); + _mapper.MapIncomingToEnvelope(envelope, msg); + + await receiver.ReceivedAsync(listener, envelope); + } + catch (Exception ex) + { + _logger.LogError( + ex, + "Error processing NATS message from subject {Subject}", + msg.Subject + ); + } + } + } + catch (OperationCanceledException) + { + _logger.LogDebug( + "NATS listener for pattern {Pattern} was cancelled", + pattern + ); + } + }, + cancellation + ); + + _consumerTasks.Add(consumerTask); + } + } + + public async Task RepublishAsync(NatsEnvelope envelope, CancellationToken cancellation) + { + // Increment attempts before republishing for requeue support + envelope.Attempts++; + + // Republish the message to the same subject for requeue support + var headers = new NatsHeaders(); + _mapper.MapEnvelopeToOutgoing(envelope, headers); + + await _connection.PublishAsync( + _endpoint.Subject, + envelope.Data, + headers, + cancellationToken: cancellation + ); + } + + public async ValueTask DisposeAsync() + { + // Dispose subscriptions first - this will cause ReadAllAsync to complete + foreach (var subscription in _subscriptions) + { + try + { + await subscription.DisposeAsync(); + } + catch (Exception) + { + // Ignore disposal errors + } + } + + // Wait for consumer tasks to complete - they should exit once subscriptions are disposed + if (_consumerTasks.Any()) + { + try + { + await Task.WhenAll(_consumerTasks); + } + catch (OperationCanceledException) + { + // Expected during shutdown + } + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/DefaultTenantSubjectMapper.cs b/src/Transports/NATS/Wolverine.Nats/Internal/DefaultTenantSubjectMapper.cs new file mode 100644 index 000000000..226240f52 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/DefaultTenantSubjectMapper.cs @@ -0,0 +1,50 @@ +namespace Wolverine.Nats.Internal; + +/// +/// Default implementation of ITenantSubjectMapper that uses a prefix-based approach. +/// Maps subjects like "orders.created" to "tenant1.orders.created" for tenant "tenant1". +/// +public class DefaultTenantSubjectMapper : ITenantSubjectMapper +{ + private readonly string _separator; + private readonly bool _normalizeSlashes; + + public DefaultTenantSubjectMapper(string separator = ".", bool normalizeSlashes = true) + { + _separator = separator; + _normalizeSlashes = normalizeSlashes; + } + + public string MapSubject(string baseSubject, string tenantId) + { + if (string.IsNullOrEmpty(tenantId)) + return baseSubject; + + var normalizedTenantId = _normalizeSlashes + ? tenantId.Replace('/', _separator[0]) + : tenantId; + + return $"{normalizedTenantId}{_separator}{baseSubject}"; + } + + public string? ExtractTenantId(string subject) + { + if (string.IsNullOrEmpty(subject)) + return null; + + var firstSeparatorIndex = subject.IndexOf(_separator, StringComparison.Ordinal); + if (firstSeparatorIndex <= 0) + return null; + + var tenantId = subject.Substring(0, firstSeparatorIndex); + + return _normalizeSlashes + ? tenantId.Replace(_separator[0], '/') + : tenantId; + } + + public string GetSubscriptionPattern(string baseSubject) + { + return $"*{_separator}{baseSubject}"; + } +} \ No newline at end of file diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/INatsPublisher.cs b/src/Transports/NATS/Wolverine.Nats/Internal/INatsPublisher.cs new file mode 100644 index 000000000..8b0294bd2 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/INatsPublisher.cs @@ -0,0 +1,19 @@ +using NATS.Client.Core; + +namespace Wolverine.Nats.Internal; + +/// +/// Internal interface for NATS publishers +/// +internal interface INatsPublisher +{ + ValueTask PingAsync(CancellationToken cancellation); + ValueTask PublishAsync( + string subject, + byte[] data, + NatsHeaders headers, + string? replyTo, + Envelope envelope, + CancellationToken cancellation + ); +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/INatsSubscriber.cs b/src/Transports/NATS/Wolverine.Nats/Internal/INatsSubscriber.cs new file mode 100644 index 000000000..269bd6baa --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/INatsSubscriber.cs @@ -0,0 +1,17 @@ +using Wolverine.Transports; + +namespace Wolverine.Nats.Internal; + +/// +/// Internal interface for NATS subscribers +/// +internal interface INatsSubscriber : IAsyncDisposable +{ + Task StartAsync(IListener listener, IReceiver receiver, CancellationToken cancellation); + bool SupportsNativeDeadLetterQueue { get; } + + /// + /// Republish a message to the subject for requeue support in Core NATS + /// + Task RepublishAsync(NatsEnvelope envelope, CancellationToken cancellation); +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs new file mode 100644 index 000000000..267dbbee6 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamPublisher.cs @@ -0,0 +1,110 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Net; + +namespace Wolverine.Nats.Internal; + +/// +/// JetStream publisher for sending messages with durability +/// +internal class JetStreamPublisher : INatsPublisher +{ + private readonly NatsConnection _connection; + private readonly INatsJSContext _jetStreamContext; + private readonly ILogger _logger; + + public JetStreamPublisher(NatsConnection connection, ILogger logger) + { + _connection = connection; + _logger = logger; + _jetStreamContext = connection.CreateJetStreamContext(); + } + + public async ValueTask PingAsync(CancellationToken cancellation) + { + try + { + var pingSubject = $"_INBOX.wolverine.ping.{Guid.NewGuid():N}"; + await _connection.PublishAsync( + pingSubject, + Array.Empty(), + cancellationToken: cancellation + ); + return _connection.ConnectionState == NatsConnectionState.Open; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to ping NATS endpoint"); + return false; + } + } + + public async ValueTask PublishAsync( + string subject, + byte[] data, + NatsHeaders headers, + string? replyTo, + Envelope envelope, + CancellationToken cancellation + ) + { + if (envelope.IsResponse) + { + await _connection.PublishAsync( + subject, + data, + headers, + replyTo, + cancellationToken: cancellation + ); + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "Reply message {MessageId} published via Core NATS to {Subject}", + envelope.Id, + subject + ); + } + } + else + { + // Check if this is a scheduled message + if (envelope.ScheduledTime.HasValue) + { + // Add NATS scheduling headers + // Format: @at + var scheduledTime = envelope.ScheduledTime.Value.ToUniversalTime(); + headers["Nats-Schedule"] = $"@at {scheduledTime:O}"; + headers["Nats-Schedule-Target"] = subject; + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "Scheduling message {MessageId} for delivery at {ScheduledTime} to {Subject}", + envelope.Id, + scheduledTime, + subject + ); + } + } + + var ack = await _jetStreamContext.PublishAsync( + subject, + data, + headers: headers, + cancellationToken: cancellation + ); + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "Message {MessageId} published to JetStream with sequence {Sequence}", + envelope.Id, + ack.Seq + ); + } + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs new file mode 100644 index 000000000..da4778769 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/JetStreamSubscriber.cs @@ -0,0 +1,192 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Net; +using Wolverine.Transports; + +namespace Wolverine.Nats.Internal; + +internal class JetStreamSubscriber : INatsSubscriber +{ + private readonly NatsEndpoint _endpoint; + private readonly NatsConnection _connection; + private readonly ILogger _logger; + private readonly JetStreamEnvelopeMapper _mapper; + private readonly string _subscriptionPattern; + private readonly INatsJSContext _jetStreamContext; + private INatsJSConsumer? _consumer; + private Task? _consumerTask; + + public JetStreamSubscriber( + NatsEndpoint endpoint, + NatsConnection connection, + ILogger logger, + JetStreamEnvelopeMapper mapper, + string? subscriptionPattern = null + ) + { + _endpoint = endpoint; + _connection = connection; + _logger = logger; + _mapper = mapper; + _subscriptionPattern = subscriptionPattern ?? endpoint.Subject; + _jetStreamContext = connection.CreateJetStreamContext(); + } + + public bool SupportsNativeDeadLetterQueue => _endpoint.DeadLetterQueueEnabled; + + public async Task StartAsync( + IListener listener, + IReceiver receiver, + CancellationToken cancellation + ) + { + _logger.LogInformation( + "Starting JetStream listener for stream {Stream}, consumer {Consumer}, pattern {Pattern} (base subject: {Subject})", + _endpoint.StreamName, + _endpoint.ConsumerName ?? "(ephemeral)", + _subscriptionPattern, + _endpoint.Subject + ); + + var config = new ConsumerConfig + { + AckPolicy = ConsumerConfigAckPolicy.Explicit, + MaxDeliver = _endpoint.MaxDeliveryAttempts, + AckWait = TimeSpan.FromSeconds(30) + }; + + if (string.IsNullOrEmpty(_endpoint.ConsumerName)) + { + config.FilterSubject = _subscriptionPattern; + } + + if (!string.IsNullOrEmpty(_endpoint.ConsumerName)) + { + config.Name = _endpoint.ConsumerName; + config.DurableName = _endpoint.ConsumerName; + + if (!string.IsNullOrEmpty(_endpoint.QueueGroup)) + { + config.DeliverGroup = _endpoint.QueueGroup; + } + + try + { + _consumer = await _jetStreamContext.GetConsumerAsync( + _endpoint.StreamName!, + _endpoint.ConsumerName, + cancellation + ); + _logger.LogInformation( + "Using existing consumer {Consumer}", + _endpoint.ConsumerName + ); + } + catch (NatsJSException) + { + _consumer = await _jetStreamContext.CreateOrUpdateConsumerAsync( + _endpoint.StreamName!, + config, + cancellation + ); + _logger.LogInformation("Created consumer {Consumer}", _endpoint.ConsumerName); + } + } + else + { + _consumer = await _jetStreamContext.CreateOrUpdateConsumerAsync( + _endpoint.StreamName!, + config, + cancellation + ); + _logger.LogInformation( + "Created ephemeral consumer for subject {Subject}", + _endpoint.Subject + ); + } + + _consumerTask = Task.Run( + async () => + { + await foreach ( + var msg in _consumer!.ConsumeAsync(cancellationToken: cancellation) + ) + { + try + { + // Skip messages without data + if (msg.Data == null || msg.Data.Length == 0) + { + _logger.LogDebug( + "Skipping empty JetStream message from subject {Subject}", + msg.Subject + ); + await msg.AckAsync(cancellationToken: cancellation); + continue; + } + + // Skip messages without headers or without message-type header. + // These are typically NATS protocol messages that should not be processed by Wolverine. + if (msg.Headers == null || !msg.Headers.ContainsKey("message-type")) + { + await msg.AckAsync(cancellationToken: cancellation); + continue; + } + + var envelope = new NatsEnvelope(null, msg); + _mapper.MapIncomingToEnvelope(envelope, msg); + + await receiver.ReceivedAsync(listener, envelope); + } + catch (Exception ex) + { + _logger.LogError( + ex, + "Error processing JetStream message from subject {Subject}", + msg.Subject + ); + } + } + }, + cancellation + ); + } + + public Task RepublishAsync(NatsEnvelope envelope, CancellationToken cancellation) + { + // JetStream uses native NAK for requeue, so this is not needed + // This method is only called for Core NATS + return Task.CompletedTask; + } + + public async ValueTask DisposeAsync() + { + // Dispose consumer first - this will cause ConsumeAsync to complete + if (_consumer is IAsyncDisposable disposableConsumer) + { + try + { + await disposableConsumer.DisposeAsync(); + } + catch (Exception) + { + // Ignore disposal errors + } + } + + // Wait for consumer task to complete - it should exit once consumer is disposed + if (_consumerTask != null) + { + try + { + await _consumerTask; + } + catch (OperationCanceledException) + { + // Expected during shutdown + } + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs new file mode 100644 index 000000000..4546f1640 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEndpoint.cs @@ -0,0 +1,318 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.JetStream.Models; +using NATS.Net; +using Wolverine.Configuration; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Nats.Internal; + +public class NatsEndpoint : Endpoint, IBrokerEndpoint +{ + private readonly NatsTransport _transport; + private NatsConnection? _connection; + private ILogger? _logger; + private NatsEnvelopeMapper? _mapper; + + public NatsEndpoint(string subject, NatsTransport transport, EndpointRole role) + : base(new Uri($"nats://subject/{subject}"), role) + { + Subject = subject; + _transport = transport; + + EndpointName = subject; + Mode = EndpointMode.BufferedInMemory; + } + + public string Subject { get; } + public object? NatsSerializer { get; set; } + public Dictionary CustomHeaders { get; set; } = new(); + public string? QueueGroup { get; set; } + public string? StreamName { get; set; } + public string? ConsumerName { get; set; } + public bool UseJetStream { get; set; } + public bool DeadLetterQueueEnabled { get; set; } = true; + public string? DeadLetterSubject { get; set; } + public int MaxDeliveryAttempts { get; set; } = 5; + + protected override bool supportsMode(EndpointMode mode) + { + return mode switch + { + EndpointMode.Inline => true, + EndpointMode.BufferedInMemory => true, + EndpointMode.Durable => UseJetStream, + _ => false + }; + } + + protected override ISender CreateSender(IWolverineRuntime runtime) + { + _connection = _transport.Connection; + _logger = runtime.LoggerFactory.CreateLogger(); + _mapper = new NatsEnvelopeMapper(this); + + if (MessageType != null) + { + _mapper.ReceivesMessage(MessageType); + } + + var useJetStream = UseJetStream && _transport.Configuration.EnableJetStream; + var supportsScheduledSend = useJetStream && + _transport.ServerSupportsScheduledSend && + StreamName != null && + _transport.Configuration.Streams.TryGetValue(StreamName, out var streamConfig) && + streamConfig.AllowMsgSchedules; + + var baseSender = NatsSender.Create( + this, + _connection, + _logger, + _mapper, + runtime.Cancellation, + useJetStream, + supportsScheduledSend + ); + + if (_transport.Tenants.Any() && TenancyBehavior == TenancyBehavior.TenantAware) + { + var tenantedSender = new TenantedSender(Uri, _transport.TenantedIdBehavior, baseSender); + + foreach (var tenant in _transport.Tenants) + { + var subjectMapper = tenant.SubjectMapper ?? _transport.TenantSubjectMapper; + var tenantSubject = subjectMapper.MapSubject(Subject, tenant.TenantId); + + var tenantEndpoint = new NatsEndpoint(tenantSubject, _transport, Role) + { + UseJetStream = UseJetStream, + StreamName = StreamName, + ConsumerName = ConsumerName, + QueueGroup = QueueGroup, + DeadLetterQueueEnabled = DeadLetterQueueEnabled, + DeadLetterSubject = DeadLetterSubject, + MaxDeliveryAttempts = MaxDeliveryAttempts, + MessageType = MessageType, + CustomHeaders = CustomHeaders, + NatsSerializer = NatsSerializer + }; + + var tenantSender = NatsSender.Create( + tenantEndpoint, + _connection, + _logger, + _mapper, + runtime.Cancellation, + useJetStream, + supportsScheduledSend + ); + + tenantedSender.RegisterSender(tenant.TenantId, tenantSender); + } + + return tenantedSender; + } + + return baseSender; + } + + public override async ValueTask BuildListenerAsync( + IWolverineRuntime runtime, + IReceiver receiver + ) + { + _connection = _transport.Connection; + _logger = runtime.LoggerFactory.CreateLogger(); + + ISender? deadLetterSender = null; + if (!string.IsNullOrEmpty(DeadLetterSubject)) + { + var dlqEndpoint = _transport.EndpointForSubject(DeadLetterSubject); + deadLetterSender = (ISender)runtime.Endpoints.GetOrBuildSendingAgent(dlqEndpoint.Uri); + } + + string subscriptionPattern = Subject; + ITenantSubjectMapper? tenantMapper = null; + + if (_transport.Tenants.Any() && TenancyBehavior == TenancyBehavior.TenantAware) + { + tenantMapper = _transport.TenantSubjectMapper; + subscriptionPattern = tenantMapper.GetSubscriptionPattern(Subject); + } + + var listener = NatsListener.Create( + this, + _connection, + runtime, + receiver, + _logger, + deadLetterSender, + runtime.Cancellation, + UseJetStream && _transport.Configuration.EnableJetStream, + subscriptionPattern, + tenantMapper + ); + + await listener.StartAsync(); + + return listener; + } + + public NatsHeaders BuildHeaders(Envelope envelope) + { + var headers = new NatsHeaders(); + _mapper?.MapEnvelopeToOutgoing(envelope, headers); + + foreach (var header in CustomHeaders) + { + headers[header.Key] = header.Value; + } + + return headers; + } + + public async ValueTask CheckAsync() + { + _connection ??= _transport.Connection; + + if (_connection == null || _connection.ConnectionState != NatsConnectionState.Open) + { + return false; + } + + if (!UseJetStream || string.IsNullOrEmpty(StreamName)) + { + return true; + } + + try + { + var js = _connection.CreateJetStreamContext(); + var stream = await js.GetStreamAsync( + StreamName, + cancellationToken: CancellationToken.None + ); + return stream != null; + } + catch + { + return false; + } + } + + public async ValueTask TeardownAsync(ILogger logger) + { + _connection ??= _transport.Connection; + + if (_connection == null || _connection.ConnectionState != NatsConnectionState.Open) + { + return; + } + + if (!UseJetStream || string.IsNullOrEmpty(StreamName)) + { + return; + } + + if (!string.IsNullOrEmpty(ConsumerName)) + { + try + { + var js = _connection.CreateJetStreamContext(); + await js.DeleteConsumerAsync(StreamName, ConsumerName); + logger.LogInformation( + "Deleted consumer {Consumer} from stream {Stream}", + ConsumerName, + StreamName + ); + } + catch (Exception ex) + { + logger.LogWarning( + ex, + "Failed to delete consumer {Consumer} from stream {Stream}", + ConsumerName, + StreamName + ); + } + } + } + + public async ValueTask SetupAsync(ILogger logger) + { + _connection ??= _transport.Connection; + + if (_connection == null || _connection.ConnectionState != NatsConnectionState.Open) + { + throw new InvalidOperationException("NATS connection is not available or not open"); + } + + if (!UseJetStream || string.IsNullOrEmpty(StreamName)) + { + logger.LogInformation("Using Core NATS for subject {Subject}", Subject); + return; + } + + var js = _connection.CreateJetStreamContext(); + + try + { + var stream = await js.GetStreamAsync(StreamName); + logger.LogInformation("Using existing JetStream stream {Stream}", StreamName); + } + catch + { + var subjects = new List { Subject }; + + if (_transport.Tenants.Any() && TenancyBehavior == TenancyBehavior.TenantAware) + { + var wildcardPattern = _transport.TenantSubjectMapper.GetSubscriptionPattern(Subject); + if (wildcardPattern != Subject) + { + subjects.Add(wildcardPattern); + } + } + + logger.LogInformation( + "Creating JetStream stream {Stream} for subjects {Subjects}", + StreamName, + string.Join(", ", subjects) + ); + + var config = new StreamConfig(StreamName, subjects) + { + Retention = StreamConfigRetention.Workqueue, + Discard = StreamConfigDiscard.Old, + MaxAge = TimeSpan.FromDays(1), + DuplicateWindow = TimeSpan.FromMinutes(2), + MaxMsgs = 1_000_000 + }; + + await js.CreateStreamAsync(config); + logger.LogInformation("Created JetStream stream {Stream}", StreamName); + } + + if (!string.IsNullOrEmpty(ConsumerName) && Role == EndpointRole.Application) + { + var consumerConfig = new ConsumerConfig + { + Name = ConsumerName, + DurableName = ConsumerName, + FilterSubject = Subject, + AckPolicy = ConsumerConfigAckPolicy.Explicit, + AckWait = TimeSpan.FromSeconds(30), + MaxDeliver = MaxDeliveryAttempts, + ReplayPolicy = ConsumerConfigReplayPolicy.Instant + }; + + await js.CreateOrUpdateConsumerAsync(StreamName, consumerConfig); + logger.LogInformation( + "Created/updated consumer {Consumer} on stream {Stream}", + ConsumerName, + StreamName + ); + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsEnvelope.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEnvelope.cs new file mode 100644 index 000000000..92a1f09e3 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEnvelope.cs @@ -0,0 +1,16 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; + +namespace Wolverine.Nats.Internal; + +public class NatsEnvelope : Envelope +{ + public NatsEnvelope(NatsMsg? coreMsg, INatsJSMsg? jetStreamMsg) + { + CoreMsg = coreMsg; + JetStreamMsg = jetStreamMsg; + } + + public NatsMsg? CoreMsg { get; } + public INatsJSMsg? JetStreamMsg { get; } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsEnvelopeMapper.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEnvelopeMapper.cs new file mode 100644 index 000000000..6bd91365b --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsEnvelopeMapper.cs @@ -0,0 +1,157 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using Wolverine.Runtime.Serialization; +using Wolverine.Transports; + +namespace Wolverine.Nats.Internal; + +public class NatsEnvelopeMapper : EnvelopeMapper, NatsHeaders> +{ + private readonly ITenantSubjectMapper? _tenantMapper; + + public NatsEnvelopeMapper(NatsEndpoint endpoint, ITenantSubjectMapper? tenantMapper = null) + : base(endpoint) + { + _tenantMapper = tenantMapper; + } + + protected override void writeOutgoingHeader(NatsHeaders headers, string key, string value) + { + headers[key] = value; + } + + protected override bool tryReadIncomingHeader( + NatsMsg incoming, + string key, + out string? value + ) + { + value = null; + + if (incoming.Headers == null) + { + return false; + } + + if (incoming.Headers.TryGetValue(key, out var values)) + { + value = values.ToString(); + return true; + } + + return false; + } + + protected override void writeIncomingHeaders(NatsMsg incoming, Envelope envelope) + { + envelope.Data = incoming.Data; + envelope.Destination = new Uri($"nats://subject/{incoming.Subject}"); + + if (_tenantMapper != null) + { + var tenantId = _tenantMapper.ExtractTenantId(incoming.Subject); + if (tenantId != null) + { + envelope.TenantId = tenantId; + } + } + + if (!string.IsNullOrEmpty(incoming.ReplyTo)) + { + EnvelopeSerializer.ReadDataElement( + envelope, + EnvelopeConstants.ReplyUriKey, + $"nats://subject/{incoming.ReplyTo}" + ); + } + + if (incoming.Headers != null) + { + foreach (var header in incoming.Headers) + { + envelope.Headers[header.Key] = header.Value; + } + } + } +} + +public class JetStreamEnvelopeMapper : EnvelopeMapper, NatsHeaders> +{ + private readonly ITenantSubjectMapper? _tenantMapper; + + public JetStreamEnvelopeMapper(NatsEndpoint endpoint, ITenantSubjectMapper? tenantMapper = null) + : base(endpoint) + { + _tenantMapper = tenantMapper; + } + + protected override void writeOutgoingHeader(NatsHeaders headers, string key, string value) + { + headers[key] = value; + } + + protected override bool tryReadIncomingHeader( + INatsJSMsg incoming, + string key, + out string? value + ) + { + value = null; + + if (incoming.Headers == null) + { + return false; + } + + if (incoming.Headers.TryGetValue(key, out var values)) + { + value = values.ToString(); + return true; + } + + return false; + } + + protected override void writeIncomingHeaders(INatsJSMsg incoming, Envelope envelope) + { + envelope.Data = incoming.Data; + envelope.Destination = new Uri($"nats://subject/{incoming.Subject}"); + + if (_tenantMapper != null) + { + var tenantId = _tenantMapper.ExtractTenantId(incoming.Subject); + if (tenantId != null) + { + envelope.TenantId = tenantId; + } + } + + if (!string.IsNullOrEmpty(incoming.ReplyTo)) + { + EnvelopeSerializer.ReadDataElement( + envelope, + EnvelopeConstants.ReplyUriKey, + $"nats://subject/{incoming.ReplyTo}" + ); + } + + if (incoming.Metadata != null) + { + var metadata = incoming.Metadata.Value; + envelope.Headers["nats-stream"] = metadata.Stream; + envelope.Headers["nats-consumer"] = metadata.Consumer; + envelope.Headers["nats-delivered"] = metadata.NumDelivered.ToString(); + envelope.Headers["nats-pending"] = metadata.NumPending.ToString(); + envelope.Headers["nats-stream-seq"] = metadata.Sequence.Stream.ToString(); + envelope.Headers["nats-consumer-seq"] = metadata.Sequence.Consumer.ToString(); + } + + if (incoming.Headers != null) + { + foreach (var header in incoming.Headers) + { + envelope.Headers[header.Key] = header.Value; + } + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsListener.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsListener.cs new file mode 100644 index 000000000..ed40ec6c7 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsListener.cs @@ -0,0 +1,211 @@ +using JasperFx.Blocks; +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using Wolverine.Runtime; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Nats.Internal; + +public class NatsListener : IListener, ISupportDeadLetterQueue +{ + private readonly NatsEndpoint _endpoint; + private readonly IWolverineRuntime _runtime; + private readonly IReceiver _receiver; + private readonly ILogger _logger; + private readonly CancellationTokenSource _cancellation; + private readonly RetryBlock _complete; + private readonly RetryBlock _defer; + private readonly INatsSubscriber _subscriber; + private readonly ISender _deadLetterSender; + + public IHandlerPipeline? Pipeline { get; private set; } + + internal NatsListener( + NatsEndpoint endpoint, + INatsSubscriber subscriber, + IWolverineRuntime runtime, + IReceiver receiver, + ILogger logger, + ISender deadLetterSender, + CancellationToken parentCancellation + ) + { + _endpoint = endpoint; + _subscriber = subscriber; + _runtime = runtime; + _receiver = receiver; + _logger = logger; + _deadLetterSender = deadLetterSender; + _cancellation = CancellationTokenSource.CreateLinkedTokenSource(parentCancellation); + Address = endpoint.Uri; + + _complete = new RetryBlock( + async (envelope, _) => + { + if (envelope.JetStreamMsg != null) + { + await envelope.JetStreamMsg.AckAsync( + cancellationToken: _cancellation.Token + ); + } + }, + logger, + _cancellation.Token + ); + + _defer = new RetryBlock( + async (envelope, _) => + { + if (envelope.JetStreamMsg != null) + { + // JetStream supports native NAK which will redeliver the message + await envelope.JetStreamMsg.NakAsync( + cancellationToken: _cancellation.Token + ); + } + else + { + // Core NATS doesn't have native requeue - republish the message to the subject + await _subscriber.RepublishAsync(envelope, _cancellation.Token); + } + }, + logger, + _cancellation.Token + ); + } + + public Uri Address { get; } + + public bool NativeDeadLetterQueueEnabled => _subscriber.SupportsNativeDeadLetterQueue; + + public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) + { + if (envelope is NatsEnvelope natsEnvelope) + { + if (NativeDeadLetterQueueEnabled && natsEnvelope.JetStreamMsg != null) + { + var metadata = natsEnvelope.JetStreamMsg.Metadata; + + if (metadata?.NumDelivered >= (ulong)_endpoint.MaxDeliveryAttempts) + { + await natsEnvelope.JetStreamMsg.AckAsync( + cancellationToken: _cancellation.Token + ); + + if (!string.IsNullOrEmpty(_endpoint.DeadLetterSubject)) + { + envelope.Attempts = (int)(metadata?.NumDelivered ?? 1); + + envelope.Headers["x-dlq-reason"] = exception.Message; + envelope.Headers["x-dlq-timestamp"] = DateTimeOffset.UtcNow.ToString("O"); + envelope.Headers["x-dlq-original-subject"] = _endpoint.Subject; + envelope.Headers["x-dlq-attempts"] = envelope.Attempts.ToString(); + envelope.Headers["x-dlq-exception-type"] = + exception.GetType().FullName ?? "Unknown"; + + await _deadLetterSender.SendAsync(envelope); + } + + _logger.LogError( + exception, + "Message {MessageId} moved to dead letter queue after {Attempts} attempts. Subject: {Subject}", + envelope.Id, + metadata?.NumDelivered ?? 1, + _endpoint.DeadLetterSubject + ); + } + } + } + } + + public async ValueTask CompleteAsync(Envelope envelope) + { + if (envelope is NatsEnvelope natsEnvelope) + { + await _complete.PostAsync(natsEnvelope); + } + } + + public async ValueTask DeferAsync(Envelope envelope) + { + if (envelope is NatsEnvelope natsEnvelope) + { + await _defer.PostAsync(natsEnvelope); + } + } + + public async Task StartAsync() + { + await _subscriber.StartAsync(this, _receiver, _cancellation.Token); + } + + public async ValueTask StopAsync() + { + _cancellation.Cancel(); + await _subscriber.DisposeAsync(); + } + + public void Dispose() + { + _cancellation.Cancel(); + _cancellation.Dispose(); + } + + public async ValueTask DisposeAsync() + { + if (!_cancellation.IsCancellationRequested) + { + _cancellation.Cancel(); + await _subscriber.DisposeAsync(); + } + + _complete.Dispose(); + _defer.Dispose(); + _cancellation.Dispose(); + } + + internal static NatsListener Create( + NatsEndpoint endpoint, + NatsConnection connection, + IWolverineRuntime runtime, + IReceiver receiver, + ILogger logger, + ISender? deadLetterSender, + CancellationToken cancellation, + bool useJetStream, + string? subscriptionPattern = null, + ITenantSubjectMapper? tenantMapper = null + ) + { + INatsSubscriber subscriber; + if (useJetStream) + { + var jsMapper = new JetStreamEnvelopeMapper(endpoint, tenantMapper); + if (endpoint.MessageType != null) + { + jsMapper.ReceivesMessage(endpoint.MessageType); + } + subscriber = new JetStreamSubscriber(endpoint, connection, logger, jsMapper, subscriptionPattern); + } + else + { + var mapper = new NatsEnvelopeMapper(endpoint, tenantMapper); + if (endpoint.MessageType != null) + { + mapper.ReceivesMessage(endpoint.MessageType); + } + subscriber = new CoreNatsSubscriber(endpoint, connection, logger, mapper, subscriptionPattern); + } + + return new NatsListener( + endpoint, + subscriber, + runtime, + receiver, + logger, + deadLetterSender ?? new NullSender(), + cancellation + ); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs new file mode 100644 index 000000000..4333abe1e --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsSender.cs @@ -0,0 +1,141 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using Wolverine.Transports.Sending; + +namespace Wolverine.Nats.Internal; + +public class NatsSender : ISender +{ + private readonly NatsEndpoint _endpoint; + private readonly ILogger _logger; + private readonly NatsEnvelopeMapper _mapper; + private readonly CancellationToken _cancellation; + private readonly INatsPublisher _publisher; + private readonly bool _supportsNativeScheduledSend; + + internal NatsSender( + NatsEndpoint endpoint, + INatsPublisher publisher, + ILogger logger, + NatsEnvelopeMapper mapper, + CancellationToken cancellation, + bool supportsNativeScheduledSend + ) + { + _endpoint = endpoint; + _publisher = publisher; + _logger = logger; + _mapper = mapper; + _cancellation = cancellation; + _supportsNativeScheduledSend = supportsNativeScheduledSend; + Destination = endpoint.Uri; + } + + internal static NatsSender Create( + NatsEndpoint endpoint, + NatsConnection connection, + ILogger logger, + NatsEnvelopeMapper mapper, + CancellationToken cancellation, + bool useJetStream, + bool supportsNativeScheduledSend + ) + { + INatsPublisher publisher = useJetStream + ? new JetStreamPublisher(connection, logger) + : new CoreNatsPublisher(connection, logger); + + return new NatsSender(endpoint, publisher, logger, mapper, cancellation, supportsNativeScheduledSend); + } + + public bool SupportsNativeScheduledSend => _supportsNativeScheduledSend; + public Uri Destination { get; } + + public async Task PingAsync() + { + return await _publisher.PingAsync(_cancellation); + } + + public async ValueTask SendAsync(Envelope envelope) + { + try + { + var headers = new NatsHeaders(); + _mapper.MapEnvelopeToOutgoing(envelope, headers); + + foreach (var header in _endpoint.CustomHeaders) + { + headers[header.Key] = header.Value; + } + + var data = envelope.Data ?? Array.Empty(); + + var targetSubject = _endpoint.Subject; + string? replyTo = null; + + if (envelope.IsResponse && envelope.Destination != null) + { + // For response messages, Wolverine sets Destination to the original sender's reply URI + // We need to use Destination, not ReplyUri (which would be our own reply endpoint) + targetSubject = NatsTransport.ExtractSubjectFromUri(envelope.Destination); + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "Sending reply message {MessageId} to reply subject {ReplySubject}", + envelope.Id, + targetSubject + ); + } + } + else + { + if (envelope.ReplyRequested != null && envelope.ReplyUri != null) + { + replyTo = NatsTransport.ExtractSubjectFromUri(envelope.ReplyUri); + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "Sending request message {MessageId} to NATS subject {Subject} with reply-to {ReplyTo} (expecting {ReplyType})", + envelope.Id, + targetSubject, + replyTo, + envelope.ReplyRequested + ); + } + } + else + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug( + "Sending message {MessageId} to NATS subject {Subject}", + envelope.Id, + targetSubject + ); + } + } + } + + await _publisher.PublishAsync( + targetSubject, + data, + headers, + replyTo, + envelope, + _cancellation + ); + } + catch (Exception ex) + { + _logger.LogError( + ex, + "Failed to send message {MessageId} to NATS subject {Subject}", + envelope.Id, + _endpoint.Subject + ); + throw; + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsTenant.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTenant.cs new file mode 100644 index 000000000..27fba3e2d --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTenant.cs @@ -0,0 +1,17 @@ +namespace Wolverine.Nats.Internal; + +public class NatsTenant +{ + public NatsTenant(string tenantId) + { + TenantId = tenantId ?? throw new ArgumentNullException(nameof(tenantId)); + } + + public string TenantId { get; } + public ITenantSubjectMapper? SubjectMapper { get; set; } + public string? ConnectionString { get; set; } + public string? CredentialsFile { get; set; } + public string? Username { get; set; } + public string? Password { get; set; } + public string? Token { get; set; } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs new file mode 100644 index 000000000..4f45eda91 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransport.cs @@ -0,0 +1,223 @@ +using Microsoft.Extensions.Logging; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Net; +using Wolverine.Configuration; +using Wolverine.Nats.Configuration; +using Wolverine.Runtime; +using Wolverine.Transports; + +namespace Wolverine.Nats.Internal; + +public class NatsTransport : BrokerTransport, IAsyncDisposable +{ + public const string ProtocolName = "nats"; + + private readonly JasperFx.Core.LightweightCache _endpoints = new(); + private NatsConnection? _connection; + private INatsJSContext? _jetStreamContext; + private ILogger? _logger; + + /// + /// Minimum NATS server version required for scheduled message delivery + /// + private static readonly Version MinScheduledSendVersion = new(2, 12, 0); + + /// + /// Whether the connected NATS server supports scheduled message delivery (v2.12+) + /// + public bool ServerSupportsScheduledSend { get; private set; } + + internal JasperFx.Core.LightweightCache Tenants { get; } = new(); + internal ITenantSubjectMapper TenantSubjectMapper { get; set; } = new DefaultTenantSubjectMapper(); + + public NatsTransport() + : base(ProtocolName, "NATS Transport") + { + _endpoints.OnMissing = subject => + { + var normalized = NormalizeSubject(subject); + return new NatsEndpoint(normalized, this, EndpointRole.Application); + }; + } + + public override Uri ResourceUri => + Configuration.ConnectionString != null + ? new Uri(Configuration.ConnectionString) + : new Uri("nats://localhost:4222"); + + public string ResponseSubject { get; private set; } = "wolverine.response"; + + public NatsTransportConfiguration Configuration { get; } = new(); + + public NatsConnection Connection => + _connection ?? throw new InvalidOperationException("NATS connection not initialized"); + + public INatsJSContext JetStreamContext => + _jetStreamContext + ?? throw new InvalidOperationException("JetStream context not initialized"); + + protected override IEnumerable endpoints() => _endpoints; + + protected override NatsEndpoint findEndpointByUri(Uri uri) + { + var subject = ExtractSubjectFromUri(uri); + return _endpoints[subject]; + } + + public override Endpoint ReplyEndpoint() + { + return _endpoints[ResponseSubject]; + } + + public override async ValueTask ConnectAsync(IWolverineRuntime runtime) + { + _logger = runtime.LoggerFactory.CreateLogger(); + + ResponseSubject = $"wolverine.response.{runtime.Options.Durability.AssignedNodeNumber}"; + var responseEndpoint = _endpoints[ResponseSubject]; + responseEndpoint.IsUsedForReplies = true; + responseEndpoint.IsListener = true; + + var natsOpts = Configuration.ToNatsOpts(); + natsOpts = natsOpts with { Name = $"wolverine-{runtime.Options.ServiceName}" }; + _connection = new NatsConnection(natsOpts); + await _connection.ConnectAsync(); + + _logger.LogInformation("Connected to NATS at {Url}", Configuration.ConnectionString); + + // Check server version for scheduled send support + if (_connection.ServerInfo?.Version != null && + Version.TryParse(_connection.ServerInfo.Version.Split('-')[0], out var serverVersion)) + { + ServerSupportsScheduledSend = serverVersion >= MinScheduledSendVersion; + if (ServerSupportsScheduledSend) + { + _logger.LogInformation( + "NATS server version {Version} supports scheduled message delivery", + _connection.ServerInfo.Version); + } + } + + if (Configuration.EnableJetStream) + { + _jetStreamContext = _connection.CreateJetStreamContext(); + _logger.LogInformation("JetStream context initialized"); + + if (Configuration.AutoProvision && Configuration.Streams.Any()) + { + await ProvisionStreamsAsync(); + } + } + } + + public override IEnumerable DiagnosticColumns() + { + yield return new PropertyColumn("Subject", "header"); + yield return new PropertyColumn("Queue Group", "header"); + yield return new PropertyColumn("JetStream", "header"); + yield return new PropertyColumn("Consumer Name"); + } + + public static string NormalizeSubject(string subject) + { + return subject.Trim().Replace('/', '.'); + } + + public static string ExtractSubjectFromUri(Uri uri) + { + if (uri.Scheme != "nats") + { + throw new ArgumentException($"Invalid URI scheme. Expected 'nats', got '{uri.Scheme}'"); + } + + var path = uri.LocalPath.Trim('/'); + return string.IsNullOrEmpty(path) ? uri.Host : path; + } + + public NatsEndpoint EndpointForSubject(string subject) + { + var normalized = NormalizeSubject(subject); + return _endpoints[normalized]; + } + + public async ValueTask DisposeAsync() + { + try + { + if (_connection != null) + { + await _connection.DisposeAsync(); + } + } + catch (Exception ex) + { + _logger?.LogError(ex, "Error disposing NATS connection"); + } + } + + private async Task ProvisionStreamsAsync() + { + _logger?.LogInformation( + "Provisioning {Count} configured streams", + Configuration.Streams.Count + ); + + foreach (var (name, config) in Configuration.Streams) + { + try + { + var exists = false; + try + { + await JetStreamContext.GetStreamAsync(name); + exists = true; + _logger?.LogDebug("Stream {StreamName} already exists", name); + } + catch (NatsJSException) + { + } + + if (!exists) + { + var streamConfig = new StreamConfig(name, config.Subjects) + { + Retention = config.Retention, + Storage = config.Storage, + MaxMsgs = config.MaxMessages ?? -1, + MaxBytes = config.MaxBytes ?? -1, + MaxAge = config.MaxAge ?? TimeSpan.Zero, + MaxMsgsPerSubject = config.MaxMessagesPerSubject ?? 0, + Discard = config.DiscardPolicy, + NumReplicas = config.Replicas, + AllowRollupHdrs = config.AllowRollup, + AllowDirect = config.AllowDirect, + DenyDelete = config.DenyDelete, + DenyPurge = config.DenyPurge, + AllowMsgSchedules = config.AllowMsgSchedules + }; + + await JetStreamContext.CreateStreamAsync(streamConfig); + _logger?.LogInformation( + "Created stream {StreamName} with subjects: {Subjects}", + name, + string.Join(", ", config.Subjects) + ); + } + else + { + _logger?.LogDebug( + "Stream {StreamName} already exists, skipping creation", + name + ); + } + } + catch (Exception ex) + { + _logger?.LogError(ex, "Failed to provision stream {StreamName}", name); + throw new InvalidOperationException($"Failed to provision stream '{name}'", ex); + } + } + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransportExpression.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransportExpression.cs new file mode 100644 index 000000000..028b6645d --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NatsTransportExpression.cs @@ -0,0 +1,270 @@ +using Wolverine.Nats.Configuration; +using Wolverine.Transports; +using Wolverine.Transports.Sending; + +namespace Wolverine.Nats.Internal; + +public class NatsTransportExpression + : BrokerExpression< + NatsTransport, + NatsEndpoint, + NatsEndpoint, + NatsListenerConfiguration, + NatsSubscriberConfiguration, + NatsTransportExpression + > +{ + public NatsTransportExpression(NatsTransport transport, WolverineOptions options) + : base(transport, options) { } + + /// + /// Configure JetStream options + /// + public NatsTransportExpression UseJetStream(Action configure) + { + configure(Transport.Configuration.JetStreamDefaults); + Transport.Configuration.EnableJetStream = true; + return this; + } + + /// + /// Set the identifier prefix for all NATS subjects + /// + public NatsTransportExpression WithSubjectPrefix(string prefix) + { + Transport.IdentifierPrefix = prefix; + return this; + } + + /// + /// Configure TLS settings + /// + public NatsTransportExpression UseTls(bool insecureSkipVerify = false) + { + Transport.Configuration.EnableTls = true; + Transport.Configuration.TlsInsecure = insecureSkipVerify; + return this; + } + + /// + /// Configure username/password authentication + /// + public NatsTransportExpression WithCredentials(string username, string password) + { + Transport.Configuration.Username = username; + Transport.Configuration.Password = password; + return this; + } + + /// + /// Configure token authentication + /// + public NatsTransportExpression WithToken(string token) + { + Transport.Configuration.Token = token; + return this; + } + + /// + /// Configure NKey authentication + /// + public NatsTransportExpression WithNKey(string nkeyFile) + { + Transport.Configuration.NKeyFile = nkeyFile; + return this; + } + + /// + /// Set the JetStream domain + /// + public NatsTransportExpression UseJetStreamDomain(string domain) + { + Transport.Configuration.JetStreamDomain = domain; + return this; + } + + /// + /// Configure connection timeouts + /// + public NatsTransportExpression ConfigureTimeouts( + TimeSpan connectTimeout, + TimeSpan requestTimeout + ) + { + Transport.Configuration.ConnectTimeout = connectTimeout; + Transport.Configuration.RequestTimeout = requestTimeout; + return this; + } + + /// + /// Define a JetStream stream configuration + /// + public NatsTransportExpression DefineStream( + string streamName, + Action configure + ) + { + var streamConfig = new StreamConfiguration { Name = streamName }; + configure(streamConfig); + Transport.Configuration.Streams[streamName] = streamConfig; + Transport.Configuration.EnableJetStream = true; + return this; + } + + /// + /// Define a work queue stream (retention by interest) + /// + public NatsTransportExpression DefineWorkQueueStream( + string streamName, + params string[] subjects + ) + { + return DefineWorkQueueStream(streamName, null, subjects); + } + + /// + /// Define a work queue stream (retention by interest) with additional configuration + /// + public NatsTransportExpression DefineWorkQueueStream( + string streamName, + Action? configure, + params string[] subjects + ) + { + return DefineStream( + streamName, + stream => + { + stream.AsWorkQueue().WithSubjects(subjects); + configure?.Invoke(stream); + } + ); + } + + /// + /// Define a log stream with time-based retention + /// + public NatsTransportExpression DefineLogStream( + string streamName, + TimeSpan retention, + params string[] subjects + ) + { + return DefineStream( + streamName, + stream => + { + stream.WithSubjects(subjects).WithLimits(maxAge: retention); + } + ); + } + + /// + /// Define a high-availability stream with replication + /// + public NatsTransportExpression DefineReplicatedStream( + string streamName, + int replicas, + params string[] subjects + ) + { + return DefineStream( + streamName, + stream => + { + stream.WithSubjects(subjects).WithReplicas(replicas); + } + ); + } + + /// + /// Configure multi-tenancy support + /// + public NatsTransportExpression ConfigureMultiTenancy( + TenantedIdBehavior behavior = TenantedIdBehavior.FallbackToDefault + ) + { + Transport.TenantedIdBehavior = behavior; + return this; + } + + /// + /// Set a custom tenant-subject mapper + /// + public NatsTransportExpression UseTenantSubjectMapper(ITenantSubjectMapper mapper) + { + Transport.TenantSubjectMapper = mapper ?? throw new ArgumentNullException(nameof(mapper)); + return this; + } + + /// + /// Add a tenant with subject-based isolation + /// + public NatsTransportExpression AddTenant(string tenantId) + { + var tenant = new NatsTenant(tenantId); + Transport.Tenants[tenantId] = tenant; + return this; + } + + /// + /// Add a tenant with a custom subject mapper + /// + public NatsTransportExpression AddTenant(string tenantId, ITenantSubjectMapper mapper) + { + var tenant = new NatsTenant(tenantId) + { + SubjectMapper = mapper + }; + Transport.Tenants[tenantId] = tenant; + return this; + } + + /// + /// Add a tenant with custom credentials + /// + public NatsTransportExpression AddTenantWithCredentials( + string tenantId, + string username, + string password + ) + { + var tenant = new NatsTenant(tenantId) + { + Username = username, + Password = password + }; + Transport.Tenants[tenantId] = tenant; + return this; + } + + /// + /// Add a tenant with JWT credentials file + /// + public NatsTransportExpression AddTenantWithCredentialsFile( + string tenantId, + string credentialsFile + ) + { + var tenant = new NatsTenant(tenantId) + { + CredentialsFile = credentialsFile + }; + Transport.Tenants[tenantId] = tenant; + return this; + } + + protected override NatsListenerConfiguration createListenerExpression( + NatsEndpoint listenerEndpoint + ) + { + return new NatsListenerConfiguration(listenerEndpoint); + } + + protected override NatsSubscriberConfiguration createSubscriberExpression( + NatsEndpoint subscriberEndpoint + ) + { + return new NatsSubscriberConfiguration(subscriberEndpoint); + } +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/NullSender.cs b/src/Transports/NATS/Wolverine.Nats/Internal/NullSender.cs new file mode 100644 index 000000000..405e28614 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/NullSender.cs @@ -0,0 +1,16 @@ +using Wolverine.Transports.Sending; + +namespace Wolverine.Nats.Internal; + +/// +/// Null object pattern for when no dead letter queue is configured +/// +internal class NullSender : ISender +{ + public bool SupportsNativeScheduledSend => false; + public Uri Destination => new Uri("nats://null"); + + public Task PingAsync() => Task.FromResult(true); + + public ValueTask SendAsync(Envelope envelope) => ValueTask.CompletedTask; +} diff --git a/src/Transports/NATS/Wolverine.Nats/Internal/TenantAwareNatsSender.cs b/src/Transports/NATS/Wolverine.Nats/Internal/TenantAwareNatsSender.cs new file mode 100644 index 000000000..73b647820 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Internal/TenantAwareNatsSender.cs @@ -0,0 +1,43 @@ +using Wolverine.Transports.Sending; + +namespace Wolverine.Nats.Internal; + +/// +/// A wrapper around a base NATS sender that applies tenant-specific subject mapping +/// +internal class TenantAwareNatsSender : ISender +{ + private readonly ISender _innerSender; + private readonly string _tenantId; + private readonly ITenantSubjectMapper _subjectMapper; + + public TenantAwareNatsSender(ISender innerSender, string tenantId, ITenantSubjectMapper subjectMapper) + { + _innerSender = innerSender ?? throw new ArgumentNullException(nameof(innerSender)); + _tenantId = tenantId ?? throw new ArgumentNullException(nameof(tenantId)); + _subjectMapper = subjectMapper ?? throw new ArgumentNullException(nameof(subjectMapper)); + } + + public Uri Destination => _innerSender.Destination; + + public bool SupportsNativeScheduledSend => _innerSender.SupportsNativeScheduledSend; + + public Task PingAsync() => _innerSender.PingAsync(); + + public async ValueTask SendAsync(Envelope envelope) + { + var tenantEnvelope = new Envelope(envelope) + { + TenantId = _tenantId + }; + + if (tenantEnvelope.Destination != null) + { + var originalSubject = NatsTransport.ExtractSubjectFromUri(tenantEnvelope.Destination); + var tenantSubject = _subjectMapper.MapSubject(originalSubject, _tenantId); + tenantEnvelope.Destination = new Uri($"nats://subject/{tenantSubject}"); + } + + await _innerSender.SendAsync(tenantEnvelope); + } +} \ No newline at end of file diff --git a/src/Transports/NATS/Wolverine.Nats/Wolverine.Nats.csproj b/src/Transports/NATS/Wolverine.Nats/Wolverine.Nats.csproj new file mode 100644 index 000000000..d9d180201 --- /dev/null +++ b/src/Transports/NATS/Wolverine.Nats/Wolverine.Nats.csproj @@ -0,0 +1,21 @@ + + + + NATS Transport for Wolverine Messaging Systems + WolverineFx.Nats + false + false + false + false + false + + + + + + + + + + + diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index c6a9c7b92..d6e09dd2b 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -227,7 +227,7 @@ public async ValueTask PauseAsync(TimeSpan pauseTime) _restarter = new Restarter(this, pauseTime); } - private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1); + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); public async ValueTask MarkAsTooBusyAndStopReceivingAsync() { diff --git a/wolverine.sln b/wolverine.sln index c81a5f329..3f69a2afd 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -1,4 +1,4 @@ - + Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 18 VisualStudioVersion = 18.1.11304.174 d18.0 @@ -209,6 +209,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.MQTT", "src\Trans EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.MQTT.Tests", "src\Transports\MQTT\Wolverine.MQTT.Tests\Wolverine.MQTT.Tests.csproj", "{F0F8EA19-0AB7-4D23-944A-59DA514B20D0}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "NATS", "NATS", "{85FFF74F-5762-4E11-A328-2BD4794281E6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.Nats", "src\Transports\NATS\Wolverine.Nats\Wolverine.Nats.csproj", "{F939E430-43AB-435D-B74F-9FC7DAFE5074}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.Nats.Tests", "src\Transports\NATS\Wolverine.Nats.Tests\Wolverine.Nats.Tests.csproj", "{3C9ACC27-F486-4765-B703-ED64A34E28C6}" +EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.AdminApi", "src\Http\Wolverine.AdminApi\Wolverine.AdminApi.csproj", "{9A3741FD-C0EF-4275-8CB0-77D6EB407E88}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ChaosSender", "src\Samples\ChaosSender\ChaosSender.csproj", "{6F6FB8FC-564C-4B04-B254-EB53A7E4562F}" @@ -1211,6 +1217,30 @@ Global {F0F8EA19-0AB7-4D23-944A-59DA514B20D0}.Release|x64.Build.0 = Release|Any CPU {F0F8EA19-0AB7-4D23-944A-59DA514B20D0}.Release|x86.ActiveCfg = Release|Any CPU {F0F8EA19-0AB7-4D23-944A-59DA514B20D0}.Release|x86.Build.0 = Release|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Debug|x64.ActiveCfg = Debug|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Debug|x64.Build.0 = Debug|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Debug|x86.ActiveCfg = Debug|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Debug|x86.Build.0 = Debug|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Release|Any CPU.Build.0 = Release|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Release|x64.ActiveCfg = Release|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Release|x64.Build.0 = Release|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Release|x86.ActiveCfg = Release|Any CPU + {F939E430-43AB-435D-B74F-9FC7DAFE5074}.Release|x86.Build.0 = Release|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Debug|x64.ActiveCfg = Debug|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Debug|x64.Build.0 = Debug|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Debug|x86.ActiveCfg = Debug|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Debug|x86.Build.0 = Debug|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Release|Any CPU.Build.0 = Release|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Release|x64.ActiveCfg = Release|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Release|x64.Build.0 = Release|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Release|x86.ActiveCfg = Release|Any CPU + {3C9ACC27-F486-4765-B703-ED64A34E28C6}.Release|x86.Build.0 = Release|Any CPU {9A3741FD-C0EF-4275-8CB0-77D6EB407E88}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {9A3741FD-C0EF-4275-8CB0-77D6EB407E88}.Debug|Any CPU.Build.0 = Debug|Any CPU {9A3741FD-C0EF-4275-8CB0-77D6EB407E88}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -1857,6 +1887,9 @@ Global {7040CD14-EF4B-40B4-B702-0D59D1F3AEE5} = {63E9B289-95E8-4F2B-A064-156971A6853C} {92A1F3D3-1AF1-4112-B956-38E8DB1B345B} = {F8DCB579-56BE-49D2-9A9B-BEAA0C457A8B} {F0F8EA19-0AB7-4D23-944A-59DA514B20D0} = {F8DCB579-56BE-49D2-9A9B-BEAA0C457A8B} + {85FFF74F-5762-4E11-A328-2BD4794281E6} = {84D32C8B-9CCE-4925-9AEC-8F445C7A2E3D} + {F939E430-43AB-435D-B74F-9FC7DAFE5074} = {85FFF74F-5762-4E11-A328-2BD4794281E6} + {3C9ACC27-F486-4765-B703-ED64A34E28C6} = {85FFF74F-5762-4E11-A328-2BD4794281E6} {9A3741FD-C0EF-4275-8CB0-77D6EB407E88} = {4B0BC1E5-17F9-4DD0-AC93-DDC522E1BE3C} {6F6FB8FC-564C-4B04-B254-EB53A7E4562F} = {D953D733-D154-4DF2-B2B9-30BF942E1B6B} {A484AD9E-04C7-4CF9-BB59-5C7DE772851C} = {4B0BC1E5-17F9-4DD0-AC93-DDC522E1BE3C}