Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions playground/nats/Nats.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

builder.AddProject<Projects.Nats_ApiService>("api")
.WithExternalHttpEndpoints()
.WithReference(nats);
.WithReference(nats)
.WaitFor(nats);

builder.AddProject<Projects.Nats_Backend>("backend")
.WithReference(nats);
.WithReference(nats)
.WaitFor(nats);

builder.Build().Run();
9 changes: 3 additions & 6 deletions src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,13 @@ public static IResourceBuilder<MilvusServerResource> AddMilvus(this IDistributed

var milvus = new MilvusServerResource(name, apiKeyParameter);

string? connectionString = null;
MilvusClient? milvusClient = null;

builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(milvus, async (@event, ct) =>
{
connectionString = await milvus.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
if (connectionString is null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null.");
}
var connectionString = await milvus.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to NATS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just follow up milvus PR.

?? throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null.");

milvusClient = CreateMilvusClient(@event.Services, connectionString);
var lookup = builder.Resources.OfType<MilvusDatabaseResource>().ToDictionary(d => d.Name);
foreach (var databaseName in milvus.Databases)
Expand Down
5 changes: 5 additions & 0 deletions src/Aspire.Hosting.Nats/Aspire.Hosting.Nats.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@

<ItemGroup>
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
<Compile Include="$(ComponentsDir)Aspire.NATS.Net\NatsHealthCheck.cs" Link="NatsHealthCheck"></Compile>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Aspire.Hosting\Aspire.Hosting.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="NATS.Net" />
</ItemGroup>

</Project>
35 changes: 34 additions & 1 deletion src/Aspire.Hosting.Nats/NatsBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Nats;
using Aspire.Hosting.Utils;
using Aspire.NATS.Net;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;

namespace Aspire.Hosting;

Expand All @@ -25,10 +30,38 @@ public static IResourceBuilder<NatsServerResource> AddNats(this IDistributedAppl
ArgumentNullException.ThrowIfNull(name);

var nats = new NatsServerResource(name);

NatsConnection? natsConnection = null;

builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(nats, async (@event, ct) =>
{
var connectionString = await nats.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false)
?? throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{nats.Name}' resource but the connection string was null.");

var options = NatsOpts.Default with
{
LoggerFactory = @event.Services.GetRequiredService<ILoggerFactory>(),
};

options = options with { Url = connectionString };

natsConnection = new NatsConnection(options);
});

var healthCheckKey = $"{name}_check";
builder.Services.AddHealthChecks()
.Add(new HealthCheckRegistration(
healthCheckKey,
sp => new NatsHealthCheck(natsConnection!),
failureStatus: default,
tags: default,
timeout: default));

return builder.AddResource(nats)
.WithEndpoint(targetPort: 4222, port: port, name: NatsServerResource.PrimaryEndpointName)
.WithImage(NatsContainerImageTags.Image, NatsContainerImageTags.Tag)
.WithImageRegistry(NatsContainerImageTags.Registry);
.WithImageRegistry(NatsContainerImageTags.Registry)
.WithHealthCheck(healthCheckKey);
}

/// <summary>
Expand Down
42 changes: 42 additions & 0 deletions tests/Aspire.Hosting.Nats.Tests/NatsFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using Aspire.Hosting.ApplicationModel;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace Aspire.Hosting.Nats.Tests;

public class NatsFunctionalTests(ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -236,4 +238,44 @@ private static async Task CreateTestData(INatsJSContext jetStream, CancellationT
ack.EnsureSuccess();
}
}

[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnNatsBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var resource = builder.AddNats("resource")
.WithHealthCheck("blocking_check");

var dependentResource = builder.AddNats("dependentresource")
.WaitFor(resource);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}
}