Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AlterNats.Hosting" Version="1.0.6" />
<PackageVersion Include="Aspire.Hosting.AppHost" Version="13.0.2" />
<PackageVersion Include="Aspire.Hosting.PostgreSQL" Version="13.0.2" />
<PackageVersion Include="Aspire.Hosting" Version="13.0.2" />
Expand Down Expand Up @@ -38,6 +37,8 @@
<PackageVersion Include="Microsoft.Testing.Extensions.CodeCoverage" Version="18.1.0" />
<PackageVersion Include="MongoDB.Driver" Version="3.0.0" />
<PackageVersion Include="Moq" Version="4.20.72" />
<PackageVersion Include="NATS.Client.Core" Version="2.7.2" />
<PackageVersion Include="NATS.Extensions.Microsoft.DependencyInjection" Version="2.7.2" />
<PackageVersion Include="NetTopologySuite" Version="2.0.0" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.2" />
<PackageVersion Include="NodaTime" Version="3.2.2" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public static class NatsPubSubExtensions
{
/// <summary>
/// Adds support for using NATS as a subscription provider.
/// Ensure you have configured the NATS client using <code>AddNats(...)</code>
/// Ensure you have configured the NATS client using <code>AddNatsClient(...)</code>
/// before calling this method.
/// </summary>
/// <param name="builder">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="AlterNats.Hosting" />
<PackageReference Include="NATS.Client.Core" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 4 additions & 2 deletions src/HotChocolate/Core/src/Subscriptions.Nats/NatsPubSub.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using AlterNats;
using HotChocolate.Subscriptions.Diagnostics;
using NATS.Client.Core;

namespace HotChocolate.Subscriptions.Nats;

Expand Down Expand Up @@ -34,7 +34,9 @@ protected override async ValueTask OnSendAsync<TMessage>(
CancellationToken cancellationToken = default)
{
var serialized = _serializer.Serialize(message);
await _connection.PublishAsync(formattedTopic, serialized).ConfigureAwait(false);
await _connection
.PublishAsync(formattedTopic, serialized, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}

protected override async ValueTask OnCompleteAsync(string formattedTopic)
Expand Down
83 changes: 71 additions & 12 deletions src/HotChocolate/Core/src/Subscriptions.Nats/NatsTopic.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System.Diagnostics;
using AlterNats;
using HotChocolate.Subscriptions.Diagnostics;
using NATS.Client.Core;
using static HotChocolate.Subscriptions.Nats.NatsResources;

namespace HotChocolate.Subscriptions.Nats;
Expand Down Expand Up @@ -28,44 +28,103 @@ protected override async ValueTask<IAsyncDisposable> OnConnectAsync(
{
// We ensure that the processing is not started before the context is fully initialized.
Debug.Assert(_connection != null, "_connection != null");
Debug.Assert(_connection != null, "_serializer != null");
Debug.Assert(_serializer != null, "_serializer != null");

var sessionCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var natsSession = await _connection
.SubscribeAsync(Name, (string? m) => DispatchMessage(_serializer, m))
.SubscribeCoreAsync<string?>(Name, cancellationToken: sessionCts.Token)
.ConfigureAwait(false);
var processing = ProcessMessagesAsync(natsSession, sessionCts.Token);

async Task ProcessMessagesAsync(
INatsSub<string?> natsSubscription,
CancellationToken ct)
{
try
{
await foreach (var message in natsSubscription.Msgs.ReadAllAsync(ct).ConfigureAwait(false))
{
DispatchMessage(_serializer, message.Data);
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
}
catch (ObjectDisposedException) when (ct.IsCancellationRequested)
{
}
catch (Exception ex)
{
DiagnosticEvents.MessageProcessingError(Name, ex);
}
}

DiagnosticEvents.ProviderTopicInfo(Name, NatsTopic_ConnectAsync_SubscribedToNats);

return new Session(Name, natsSession, DiagnosticEvents);
return new Session(Name, natsSession, processing, sessionCts, DiagnosticEvents);
}

private sealed class Session : IAsyncDisposable
{
private readonly string _name;
private readonly IDisposable _natsSession;
private readonly INatsSub<string?> _natsSession;
private readonly Task _processing;
private readonly CancellationTokenSource _sessionCts;
private readonly ISubscriptionDiagnosticEvents _diagnosticEvents;
private bool _disposed;

public Session(
string name,
IDisposable natsSession,
INatsSub<string?> natsSession,
Task processing,
CancellationTokenSource sessionCts,
ISubscriptionDiagnosticEvents diagnosticEvents)
{
_name = name;
_natsSession = natsSession;
_processing = processing;
_sessionCts = sessionCts;
_diagnosticEvents = diagnosticEvents;
}

public ValueTask DisposeAsync()
public async ValueTask DisposeAsync()
{
if (!_disposed)
if (_disposed)
{
return;
}

_disposed = true;
_sessionCts.Cancel();

try
{
await _natsSession.DisposeAsync().ConfigureAwait(false);
}
catch (OperationCanceledException) when (_sessionCts.IsCancellationRequested)
{
}
catch (ObjectDisposedException) when (_sessionCts.IsCancellationRequested)
{
}
catch (Exception ex)
{
_diagnosticEvents.MessageProcessingError(_name, ex);
}

try
{
await _processing.ConfigureAwait(false);
}
catch (OperationCanceledException) when (_sessionCts.IsCancellationRequested)
{
}
catch (ObjectDisposedException) when (_sessionCts.IsCancellationRequested)
{
_natsSession.Dispose();
_diagnosticEvents.ProviderTopicInfo(_name, Session_Dispose_UnsubscribedFromNats);
_disposed = true;
}

return ValueTask.CompletedTask;
_sessionCts.Dispose();
_diagnosticEvents.ProviderTopicInfo(_name, Session_Dispose_UnsubscribedFromNats);
}
}
}
14 changes: 7 additions & 7 deletions src/HotChocolate/Core/src/Subscriptions.Nats/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ You can start with a single node of NATS and see where you need to go from there
You do not need to enable persistence in the NATS server (JetStream) for Publish/Subscribe to function.

```csharp
using AlterNats;
using HotChocolate.Execution;
using HotChocolate.Subscriptions;
using NATS.Extensions.Microsoft.DependencyInjection;

var builder = WebApplication.CreateBuilder(args);

builder.Services
.AddNats(poolSize: 1, opts => opts with
{
Url = "nats://localhost:4222",
// Optional serializer (defaults to System.Text.Json)
Serializer = new MessagePackNatsSerializer()
})
.AddNatsClient(nats => nats.ConfigureOptions(
options => options.Configure(
opts => opts.Opts = opts.Opts with
{
Url = "nats://localhost:4222"
})))
.AddNatsSubscriptions()
.AddGraphQLServer()
.AddMutationConventions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging" />
<PackageReference Include="NATS.Extensions.Microsoft.DependencyInjection" />
<PackageReference Include="Squadron.Nats" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using AlterNats;
using Microsoft.Extensions.DependencyInjection;
using HotChocolate.Execution.Configuration;
using HotChocolate.Execution;
using Microsoft.Extensions.DependencyInjection;
using NATS.Extensions.Microsoft.DependencyInjection;
using Squadron;
using Xunit.Abstractions;

Expand Down Expand Up @@ -48,14 +49,68 @@ public override Task Subscribe_And_Complete_Topic()
public override Task Subscribe_And_Complete_Topic_With_ValueTypeMessage()
=> base.Subscribe_And_Complete_Topic_With_ValueTypeMessage();

[Fact]
public async Task Subscribe_With_Different_Prefixes_Should_Not_Leak_Messages()
{
using var cts = new CancellationTokenSource(Timeout);
await using var primary = CreateServer(builder =>
{
builder
.AddSubscriptionType<Subscription>()
.ModifyOptions(o => o.StrictValidation = false);
builder.Services.AddSingleton(new SubscriptionOptions { TopicPrefix = "primary" });
});
await using var secondary = CreateServer(builder =>
{
builder
.AddSubscriptionType<Subscription>()
.ModifyOptions(o => o.StrictValidation = false);
builder.Services.AddSingleton(new SubscriptionOptions { TopicPrefix = "secondary" });
});

var result = await primary.ExecuteRequestAsync(
"subscription { onMessage }",
cancellationToken: cts.Token);
await using var responseStream = result.ExpectResponseStream();
var results = responseStream.ReadResultsAsync().ConfigureAwait(false);

var primarySender = primary.GetRequiredService<ITopicEventSender>();
var secondarySender = secondary.GetRequiredService<ITopicEventSender>();

await secondarySender.SendAsync("OnMessage", "secondary", cts.Token);
await secondarySender.CompleteAsync("OnMessage");

await primarySender.SendAsync("OnMessage", "primary", cts.Token);
await primarySender.CompleteAsync("OnMessage");

var snapshot = new Snapshot();

await foreach (var response in results.WithCancellation(cts.Token).ConfigureAwait(false))
{
snapshot.Add(response);
}

snapshot.MatchInline(
"""
{
"data": {
"onMessage": "primary"
}
}
""");
}

protected override void ConfigurePubSub(IRequestExecutorBuilder graphqlBuilder)
{
// register NATS
// register NATS client
graphqlBuilder.Services
.AddNats(poolSize: 1, options => options with
{
Url = _natsResource.NatsConnectionString
})
.AddNatsClient(
builder => builder.ConfigureOptions(
options => options.Configure(
nats => nats.Opts = nats.Opts with
{
Url = _natsResource.NatsConnectionString
})))
.AddLogging();

// register subscription provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ app.UseEndpoints(endpoints =>
});
```

To make pub/sub work, we also have to register a subscription provider. A subscription provider represents a pub/sub implementation used to handle events. Out of the box we support two subscription providers.
To make pub/sub work, we also have to register a subscription provider. A subscription provider represents a pub/sub implementation used to handle events. Out of the box we support four subscription providers.

## In-Memory Provider

Expand Down Expand Up @@ -198,6 +198,55 @@ builder.Services

Our Redis subscription provider uses the [StackExchange.Redis](https://github.com/StackExchange/StackExchange.Redis) Redis client underneath.

## NATS Provider

The NATS subscription provider enables us to run multiple instances of our Hot Chocolate GraphQL server and handle subscription events reliably over NATS.

In order to use the NATS provider we have to add the `HotChocolate.Subscriptions.Nats` and `NATS.Extensions.Microsoft.DependencyInjection` packages.

<PackageInstallation packageName="HotChocolate.Subscriptions.Nats" />

<PackageInstallation packageName="NATS.Extensions.Microsoft.DependencyInjection" external />

After we have added the packages we can setup the NATS subscription provider.

```csharp
using NATS.Extensions.Microsoft.DependencyInjection;

builder.Services
.AddNatsClient(
nats => nats.ConfigureOptions(
options => options.Configure(
opts => opts.Opts = opts.Opts with
{
Url = "nats://localhost:4222"
})));

builder.Services
.AddGraphQLServer()
.AddQueryType<Query>() // every GraphQL server needs a query
.AddSubscriptionType<Subscription>()
.AddNatsSubscriptions();
```

If multiple distinct GraphQL servers share the same NATS broker, configure a `TopicPrefix` to isolate their topics:

```csharp
using HotChocolate.Subscriptions;

builder.Services
.AddGraphQLServer()
.AddQueryType<Query>() // every GraphQL server needs a query
.AddSubscriptionType<Subscription>()
.AddNatsSubscriptions(
new SubscriptionOptions
{
TopicPrefix = "orders-service-dev"
});
```

The NATS provider uses NATS core publish/subscribe; JetStream is not required.

## Postgres Provider

The PostgreSQL Subscription Provider enables your GraphQL server to provide real-time updates to your clients using PostgreSQL's native `LISTEN/NOTIFY` mechanism. This provider is ideal for applications that already use PostgreSQL and want to avoid the overhead of running a separate pub/sub service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,32 @@ builder.Services.AddGraphQLServer()

If your application contains multiple GraphQL servers, the hash provider configuration has to be repeated for each one as the configuration is now scoped to a particular GraphQL server.

## NATS subscriptions now use the official NATS v2 client

The `HotChocolate.Subscriptions.Nats` package now uses the official NATS v2 client packages.
If you are migrating an application that previously used `AlterNats.Hosting`, replace it with `NATS.Extensions.Microsoft.DependencyInjection` and update your NATS client registration from `AddNats(...)` to `AddNatsClient(...)`.

```diff
builder.Services
- .AddNats(poolSize: 1, opts => opts with
- {
- Url = "nats://localhost:4222"
- });
+ .AddNatsClient(nats => nats.ConfigureOptions(
+ options => options.Configure(
+ opts => opts.Opts = opts.Opts with
+ {
+ Url = "nats://localhost:4222"
+ })));

builder.Services
.AddGraphQLServer()
.AddSubscriptionType<Subscription>()
.AddNatsSubscriptions();
```

If your code directly references NATS client types, add the `NATS.Client.Core` package as well.

## MaxAllowedNodeBatchSize & EnsureAllNodesCanBeResolved options moved

```diff
Expand Down
Loading