diff --git a/src/Directory.Packages.props b/src/Directory.Packages.props index 63a353dd7db..a2d56ec8e2d 100644 --- a/src/Directory.Packages.props +++ b/src/Directory.Packages.props @@ -3,7 +3,6 @@ true - @@ -38,6 +37,8 @@ + + diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/DependencyInjection/NatsPubSubExtensions.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/DependencyInjection/NatsPubSubExtensions.cs index c576abfeae1..f1bd6b061fb 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/DependencyInjection/NatsPubSubExtensions.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/DependencyInjection/NatsPubSubExtensions.cs @@ -14,7 +14,7 @@ public static class NatsPubSubExtensions { /// /// Adds support for using NATS as a subscription provider. - /// Ensure you have configured the NATS client using AddNats(...) + /// Ensure you have configured the NATS client using AddNatsClient(...) /// before calling this method. /// /// diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj index 2afeb472287..02f2478826b 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/HotChocolate.Subscriptions.Nats.csproj @@ -17,7 +17,7 @@ - + diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs index 9d27b123bfe..d9aca52377a 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs @@ -1,5 +1,5 @@ -using AlterNats; using HotChocolate.Subscriptions.Diagnostics; +using NATS.Client.Core; namespace HotChocolate.Subscriptions.Nats; @@ -34,7 +34,9 @@ protected override async ValueTask OnSendAsync( CancellationToken cancellationToken = default) { var serialized = _serializer.Serialize(message); - await _connection.PublishAsync(formattedTopic, serialized).ConfigureAwait(false); + await _connection + .PublishAsync(formattedTopic, serialized, cancellationToken: cancellationToken) + .ConfigureAwait(false); } protected override async ValueTask OnCompleteAsync(string formattedTopic) diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs index a9123471e03..e2bacad75be 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs @@ -1,6 +1,6 @@ using System.Diagnostics; -using AlterNats; using HotChocolate.Subscriptions.Diagnostics; +using NATS.Client.Core; using static HotChocolate.Subscriptions.Nats.NatsResources; namespace HotChocolate.Subscriptions.Nats; @@ -28,44 +28,103 @@ protected override async ValueTask OnConnectAsync( { // We ensure that the processing is not started before the context is fully initialized. Debug.Assert(_connection != null, "_connection != null"); - Debug.Assert(_connection != null, "_serializer != null"); + Debug.Assert(_serializer != null, "_serializer != null"); + var sessionCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); var natsSession = await _connection - .SubscribeAsync(Name, (string? m) => DispatchMessage(_serializer, m)) + .SubscribeCoreAsync(Name, cancellationToken: sessionCts.Token) .ConfigureAwait(false); + var processing = ProcessMessagesAsync(natsSession, sessionCts.Token); + + async Task ProcessMessagesAsync( + INatsSub natsSubscription, + CancellationToken ct) + { + try + { + await foreach (var message in natsSubscription.Msgs.ReadAllAsync(ct).ConfigureAwait(false)) + { + DispatchMessage(_serializer, message.Data); + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + } + catch (ObjectDisposedException) when (ct.IsCancellationRequested) + { + } + catch (Exception ex) + { + DiagnosticEvents.MessageProcessingError(Name, ex); + } + } DiagnosticEvents.ProviderTopicInfo(Name, NatsTopic_ConnectAsync_SubscribedToNats); - return new Session(Name, natsSession, DiagnosticEvents); + return new Session(Name, natsSession, processing, sessionCts, DiagnosticEvents); } private sealed class Session : IAsyncDisposable { private readonly string _name; - private readonly IDisposable _natsSession; + private readonly INatsSub _natsSession; + private readonly Task _processing; + private readonly CancellationTokenSource _sessionCts; private readonly ISubscriptionDiagnosticEvents _diagnosticEvents; private bool _disposed; public Session( string name, - IDisposable natsSession, + INatsSub natsSession, + Task processing, + CancellationTokenSource sessionCts, ISubscriptionDiagnosticEvents diagnosticEvents) { _name = name; _natsSession = natsSession; + _processing = processing; + _sessionCts = sessionCts; _diagnosticEvents = diagnosticEvents; } - public ValueTask DisposeAsync() + public async ValueTask DisposeAsync() { - if (!_disposed) + if (_disposed) + { + return; + } + + _disposed = true; + _sessionCts.Cancel(); + + try + { + await _natsSession.DisposeAsync().ConfigureAwait(false); + } + catch (OperationCanceledException) when (_sessionCts.IsCancellationRequested) + { + } + catch (ObjectDisposedException) when (_sessionCts.IsCancellationRequested) + { + } + catch (Exception ex) + { + _diagnosticEvents.MessageProcessingError(_name, ex); + } + + try + { + await _processing.ConfigureAwait(false); + } + catch (OperationCanceledException) when (_sessionCts.IsCancellationRequested) + { + } + catch (ObjectDisposedException) when (_sessionCts.IsCancellationRequested) { - _natsSession.Dispose(); - _diagnosticEvents.ProviderTopicInfo(_name, Session_Dispose_UnsubscribedFromNats); - _disposed = true; } - return ValueTask.CompletedTask; + _sessionCts.Dispose(); + _diagnosticEvents.ProviderTopicInfo(_name, Session_Dispose_UnsubscribedFromNats); } } } diff --git a/src/HotChocolate/Core/src/Subscriptions.Nats/readme.md b/src/HotChocolate/Core/src/Subscriptions.Nats/readme.md index 8b7b8deeb26..8cf335b62e1 100644 --- a/src/HotChocolate/Core/src/Subscriptions.Nats/readme.md +++ b/src/HotChocolate/Core/src/Subscriptions.Nats/readme.md @@ -17,19 +17,19 @@ You can start with a single node of NATS and see where you need to go from there You do not need to enable persistence in the NATS server (JetStream) for Publish/Subscribe to function. ```csharp -using AlterNats; using HotChocolate.Execution; using HotChocolate.Subscriptions; +using NATS.Extensions.Microsoft.DependencyInjection; var builder = WebApplication.CreateBuilder(args); builder.Services - .AddNats(poolSize: 1, opts => opts with - { - Url = "nats://localhost:4222", - // Optional serializer (defaults to System.Text.Json) - Serializer = new MessagePackNatsSerializer() - }) + .AddNatsClient(nats => nats.ConfigureOptions( + options => options.Configure( + opts => opts.Opts = opts.Opts with + { + Url = "nats://localhost:4222" + }))) .AddNatsSubscriptions() .AddGraphQLServer() .AddMutationConventions() diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj index b62976e04f3..f876bd55a53 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj +++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/HotChocolate.Subscriptions.Nats.Tests.csproj @@ -22,6 +22,7 @@ + diff --git a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs index f795cb6aaba..aad0d6173fc 100644 --- a/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs +++ b/src/HotChocolate/Core/test/Subscriptions.Nats.Tests/NatsIntegrationTests.cs @@ -1,6 +1,7 @@ -using AlterNats; -using Microsoft.Extensions.DependencyInjection; using HotChocolate.Execution.Configuration; +using HotChocolate.Execution; +using Microsoft.Extensions.DependencyInjection; +using NATS.Extensions.Microsoft.DependencyInjection; using Squadron; using Xunit.Abstractions; @@ -48,14 +49,68 @@ public override Task Subscribe_And_Complete_Topic() public override Task Subscribe_And_Complete_Topic_With_ValueTypeMessage() => base.Subscribe_And_Complete_Topic_With_ValueTypeMessage(); + [Fact] + public async Task Subscribe_With_Different_Prefixes_Should_Not_Leak_Messages() + { + using var cts = new CancellationTokenSource(Timeout); + await using var primary = CreateServer(builder => + { + builder + .AddSubscriptionType() + .ModifyOptions(o => o.StrictValidation = false); + builder.Services.AddSingleton(new SubscriptionOptions { TopicPrefix = "primary" }); + }); + await using var secondary = CreateServer(builder => + { + builder + .AddSubscriptionType() + .ModifyOptions(o => o.StrictValidation = false); + builder.Services.AddSingleton(new SubscriptionOptions { TopicPrefix = "secondary" }); + }); + + var result = await primary.ExecuteRequestAsync( + "subscription { onMessage }", + cancellationToken: cts.Token); + await using var responseStream = result.ExpectResponseStream(); + var results = responseStream.ReadResultsAsync().ConfigureAwait(false); + + var primarySender = primary.GetRequiredService(); + var secondarySender = secondary.GetRequiredService(); + + await secondarySender.SendAsync("OnMessage", "secondary", cts.Token); + await secondarySender.CompleteAsync("OnMessage"); + + await primarySender.SendAsync("OnMessage", "primary", cts.Token); + await primarySender.CompleteAsync("OnMessage"); + + var snapshot = new Snapshot(); + + await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false)) + { + snapshot.Add(response); + } + + snapshot.MatchInline( + """ + { + "data": { + "onMessage": "primary" + } + } + """); + } + protected override void ConfigurePubSub(IRequestExecutorBuilder graphqlBuilder) { - // register NATS + // register NATS client graphqlBuilder.Services - .AddNats(poolSize: 1, options => options with - { - Url = _natsResource.NatsConnectionString - }) + .AddNatsClient( + builder => builder.ConfigureOptions( + options => options.Configure( + nats => nats.Opts = nats.Opts with + { + Url = _natsResource.NatsConnectionString + }))) .AddLogging(); // register subscription provider diff --git a/website/src/docs/hotchocolate/v16/defining-a-schema/subscriptions.md b/website/src/docs/hotchocolate/v16/defining-a-schema/subscriptions.md index 17f8ba83e87..f233b1c83e4 100644 --- a/website/src/docs/hotchocolate/v16/defining-a-schema/subscriptions.md +++ b/website/src/docs/hotchocolate/v16/defining-a-schema/subscriptions.md @@ -168,7 +168,7 @@ app.UseEndpoints(endpoints => }); ``` -To make pub/sub work, we also have to register a subscription provider. A subscription provider represents a pub/sub implementation used to handle events. Out of the box we support two subscription providers. +To make pub/sub work, we also have to register a subscription provider. A subscription provider represents a pub/sub implementation used to handle events. Out of the box we support four subscription providers. ## In-Memory Provider @@ -198,6 +198,55 @@ builder.Services Our Redis subscription provider uses the [StackExchange.Redis](https://github.com/StackExchange/StackExchange.Redis) Redis client underneath. +## NATS Provider + +The NATS subscription provider enables us to run multiple instances of our Hot Chocolate GraphQL server and handle subscription events reliably over NATS. + +In order to use the NATS provider we have to add the `HotChocolate.Subscriptions.Nats` and `NATS.Extensions.Microsoft.DependencyInjection` packages. + + + + + +After we have added the packages we can setup the NATS subscription provider. + +```csharp +using NATS.Extensions.Microsoft.DependencyInjection; + +builder.Services + .AddNatsClient( + nats => nats.ConfigureOptions( + options => options.Configure( + opts => opts.Opts = opts.Opts with + { + Url = "nats://localhost:4222" + }))); + +builder.Services + .AddGraphQLServer() + .AddQueryType() // every GraphQL server needs a query + .AddSubscriptionType() + .AddNatsSubscriptions(); +``` + +If multiple distinct GraphQL servers share the same NATS broker, configure a `TopicPrefix` to isolate their topics: + +```csharp +using HotChocolate.Subscriptions; + +builder.Services + .AddGraphQLServer() + .AddQueryType() // every GraphQL server needs a query + .AddSubscriptionType() + .AddNatsSubscriptions( + new SubscriptionOptions + { + TopicPrefix = "orders-service-dev" + }); +``` + +The NATS provider uses NATS core publish/subscribe; JetStream is not required. + ## Postgres Provider The PostgreSQL Subscription Provider enables your GraphQL server to provide real-time updates to your clients using PostgreSQL's native `LISTEN/NOTIFY` mechanism. This provider is ideal for applications that already use PostgreSQL and want to avoid the overhead of running a separate pub/sub service. diff --git a/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md b/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md index 059a063ef22..d95c37d7263 100644 --- a/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md +++ b/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md @@ -131,6 +131,32 @@ builder.Services.AddGraphQLServer() If your application contains multiple GraphQL servers, the hash provider configuration has to be repeated for each one as the configuration is now scoped to a particular GraphQL server. +## NATS subscriptions now use the official NATS v2 client + +The `HotChocolate.Subscriptions.Nats` package now uses the official NATS v2 client packages. +If you are migrating an application that previously used `AlterNats.Hosting`, replace it with `NATS.Extensions.Microsoft.DependencyInjection` and update your NATS client registration from `AddNats(...)` to `AddNatsClient(...)`. + +```diff +builder.Services +- .AddNats(poolSize: 1, opts => opts with +- { +- Url = "nats://localhost:4222" +- }); ++ .AddNatsClient(nats => nats.ConfigureOptions( ++ options => options.Configure( ++ opts => opts.Opts = opts.Opts with ++ { ++ Url = "nats://localhost:4222" ++ }))); + +builder.Services + .AddGraphQLServer() + .AddSubscriptionType() + .AddNatsSubscriptions(); +``` + +If your code directly references NATS client types, add the `NATS.Client.Core` package as well. + ## MaxAllowedNodeBatchSize & EnsureAllNodesCanBeResolved options moved ```diff