From bed45a5d277fbf93db6c72fdc1a06225f25f2e85 Mon Sep 17 00:00:00 2001 From: Alireza Baloochi Date: Fri, 13 Sep 2024 20:12:35 +0330 Subject: [PATCH 1/3] Add waitfor milvus --- .../Aspire.Hosting.Milvus.csproj | 5 + .../MilvusBuilderExtensions.cs | 86 ++++++++++++++++- .../MilvusFunctionalTests.cs | 94 +++++++++++++++++++ 3 files changed, 182 insertions(+), 3 deletions(-) diff --git a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj index 874d012244f..ad4fd4f7880 100644 --- a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj +++ b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj @@ -6,6 +6,8 @@ aspire integration hosting milvus database vector search Milvus vector database support for .NET Aspire. $(SharedDir)Milvus_256x.png + $(NoWarn);CS8002 + false @@ -23,4 +25,7 @@ + + + diff --git a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs index a7de4edec50..2ab8ed5a44a 100644 --- a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs +++ b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs @@ -4,6 +4,9 @@ using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Milvus; using Aspire.Hosting.Utils; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; +using Milvus.Client; namespace Aspire.Hosting; @@ -51,6 +54,38 @@ public static IResourceBuilder AddMilvus(this IDistributed var milvus = new MilvusServerResource(name, apiKeyParameter); + string? connectionString = null; + + builder.Eventing.Subscribe(milvus, async (@event, ct) => + { + connectionString = await milvus.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false); + if (connectionString is null) + { + throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null."); + } + + var lookup = builder.Resources.OfType().ToDictionary(d => d.Name); + foreach (var databaseName in milvus.Databases) + { + if (!lookup.TryGetValue(databaseName.Key, out var databaseResource)) + { + throw new DistributedApplicationException($"Database resource '{databaseName}' under Milvus Server resource '{milvus.Name}' was not found in the model."); + } + var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(databaseResource, @event.Services); + await builder.Eventing.PublishAsync(connectionStringAvailableEvent, ct).ConfigureAwait(false); + var beforeResourceStartedEvent = new BeforeResourceStartedEvent(databaseResource, @event.Services); + await builder.Eventing.PublishAsync(beforeResourceStartedEvent, ct).ConfigureAwait(false); + } + }); + + var healthCheckKey = $"{name}_check"; + builder.Services.AddHealthChecks().AddAsyncCheck(healthCheckKey, async () => + { + // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: + // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 + return await HealthCheck(connectionString).ConfigureAwait(false); + }); + return builder.AddResource(milvus) .WithImage(MilvusContainerImageTags.Image, MilvusContainerImageTags.Tag) .WithImageRegistry(MilvusContainerImageTags.Registry) @@ -67,7 +102,8 @@ public static IResourceBuilder AddMilvus(this IDistributed { ctx.EnvironmentVariables["COMMON_SECURITY_DEFAULTROOTPASSWORD"] = milvus.ApiKeyParameter; }) - .WithArgs("milvus", "run", "standalone"); + .WithArgs("milvus", "run", "standalone") + .WithHealthCheck(healthCheckKey); } /// @@ -101,8 +137,29 @@ public static IResourceBuilder AddDatabase(this IResourc databaseName ??= name; builder.Resource.AddDatabase(name, databaseName); - var milvusResource = new MilvusDatabaseResource(name, databaseName, builder.Resource); - return builder.ApplicationBuilder.AddResource(milvusResource); + var milvusDatabaseResource = new MilvusDatabaseResource(name, databaseName, builder.Resource); + + string? connectionString = null; + + builder.ApplicationBuilder.Eventing.Subscribe(milvusDatabaseResource, async (@event, ct) => + { + connectionString = await milvusDatabaseResource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false); + if (connectionString == null) + { + throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvusDatabaseResource.Name}' resource but the connection string was null."); + } + }); + + var healthCheckKey = $"{name}_check"; + builder.ApplicationBuilder.Services.AddHealthChecks().AddAsyncCheck(healthCheckKey, async () => + { + // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: + // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 + return await HealthCheck(connectionString).ConfigureAwait(false); + }); + + return builder.ApplicationBuilder.AddResource(milvusDatabaseResource) + .WithHealthCheck(healthCheckKey); } /// @@ -190,4 +247,27 @@ private static void ConfigureAttuContainer(EnvironmentCallbackContext context, M // This will need to be refactored once updated service discovery APIs are available context.EnvironmentVariables.Add("MILVUS_URL", $"{resource.PrimaryEndpoint.Scheme}://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}"); } + + private static async Task HealthCheck(string? connectionString) + { + if (connectionString is null) + { + throw new InvalidOperationException("Connection string is unavailable"); + } + + try + { + var milvusClient = new MilvusClient(connectionString); + + var milvusHealthState = await milvusClient.HealthAsync().ConfigureAwait(false); + + return milvusHealthState.IsHealthy + ? HealthCheckResult.Healthy() + : new HealthCheckResult(HealthStatus.Unhealthy, description: milvusHealthState.ToString()); + } + catch (Exception ex) + { + return new HealthCheckResult(HealthStatus.Unhealthy, exception: ex); + } + } } diff --git a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs index 54b13550e1a..f5062e58139 100644 --- a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs +++ b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs @@ -2,10 +2,12 @@ // The .NET Foundation licenses this file to you under the MIT license. using Aspire.Components.Common.Tests; +using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Utils; using Grpc.Core; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Diagnostics.HealthChecks; using Microsoft.Extensions.Hosting; using Milvus.Client; using Polly; @@ -229,4 +231,96 @@ await pipeline.ExecuteAsync( } } } + + [Fact] + [RequiresDocker] + public async Task VerifyWaitForOnMilvusBlocksDependentResources() + { + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); + using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper); + + var healthCheckTcs = new TaskCompletionSource(); + builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () => + { + return healthCheckTcs.Task; + }); + + var resource = builder.AddMilvus("resource") + .WithHealthCheck("blocking_check"); + + var dependentResource = builder.AddMilvus("dependentresource") + .WaitFor(resource); + + using var app = builder.Build(); + + var pendingStart = app.StartAsync(cts.Token); + + var rns = app.Services.GetRequiredService(); + + await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token); + + healthCheckTcs.SetResult(HealthCheckResult.Healthy()); + + await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await pendingStart; + + await app.StopAsync(); + } + + [Fact] + [RequiresDocker] + public async Task VerifyWaitForOnMilvusDatabaseBlocksDependentResources() + { + var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper); + + var healthCheckTcs = new TaskCompletionSource(); + builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () => + { + return healthCheckTcs.Task; + }); + + var resource = builder.AddMilvus("resource") + .WithHealthCheck("blocking_check"); + + var db = resource.AddDatabase("db"); + + var dependentResource = builder.AddMilvus("dependentresource") + .WaitFor(db); + + using var app = builder.Build(); + + var pendingStart = app.StartAsync(cts.Token); + + var rns = app.Services.GetRequiredService(); + + await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await rns.WaitForResourceAsync(db.Resource.Name, KnownResourceStates.Running, cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token); + + healthCheckTcs.SetResult(HealthCheckResult.Healthy()); + + await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token); + + // Create the database. + var connectionString = await resource.Resource.ConnectionStringExpression.GetValueAsync(cts.Token); + var milvusClient = new MilvusClient(connectionString!); + await milvusClient.CreateDatabaseAsync(db.Resource.Name); + + await rns.WaitForResourceAsync(db.Resource.Name, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cts.Token); + + await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token); + + await pendingStart; + + await app.StopAsync(); + } + } From 8f630e3702fe4af882121460eeffdc745027106a Mon Sep 17 00:00:00 2001 From: Alireza Baloochi Date: Sat, 14 Sep 2024 20:28:58 +0330 Subject: [PATCH 2/3] Update src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs Co-authored-by: David Fowler --- src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs index 2ab8ed5a44a..cc1e6c433af 100644 --- a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs +++ b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs @@ -73,6 +73,7 @@ public static IResourceBuilder AddMilvus(this IDistributed } var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(databaseResource, @event.Services); await builder.Eventing.PublishAsync(connectionStringAvailableEvent, ct).ConfigureAwait(false); + var beforeResourceStartedEvent = new BeforeResourceStartedEvent(databaseResource, @event.Services); await builder.Eventing.PublishAsync(beforeResourceStartedEvent, ct).ConfigureAwait(false); } From 9550a2e7fa546a84d1016742585df48d978fbe46 Mon Sep 17 00:00:00 2001 From: Alireza Baloochi Date: Sat, 14 Sep 2024 23:59:32 +0330 Subject: [PATCH 3/3] Fix healthchecks and address PR feedback --- Directory.Build.props | 1 + .../Aspire.Hosting.Milvus.csproj | 7 +- .../MilvusBuilderExtensions.cs | 80 +++++++++++++------ .../Aspire.Hosting.Milvus.Tests.csproj | 4 - .../MilvusFunctionalTests.cs | 2 +- 5 files changed, 62 insertions(+), 32 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index feef18c9263..86d123d0c29 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -7,6 +7,7 @@ preview MIT $(MSBuildThisFileDirectory)/src/Shared/ + $(MSBuildThisFileDirectory)/src/Components/ $([MSBuild]::NormalizeDirectory($(MSBuildThisFileDirectory), 'tests', 'Shared')) $([MSBuild]::NormalizeDirectory($(TestsSharedDir), 'RepoTesting')) $(MSBuildThisFileDirectory)/src/Vendoring/ diff --git a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj index ad4fd4f7880..4e9e2700c24 100644 --- a/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj +++ b/src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj @@ -19,12 +19,17 @@ + - + + + + + diff --git a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs index cc1e6c433af..098bc619692 100644 --- a/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs +++ b/src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs @@ -1,11 +1,14 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Data.Common; using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Milvus; using Aspire.Hosting.Utils; +using Aspire.Milvus.Client; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Diagnostics.HealthChecks; +using Microsoft.Extensions.Logging; using Milvus.Client; namespace Aspire.Hosting; @@ -55,6 +58,7 @@ public static IResourceBuilder AddMilvus(this IDistributed var milvus = new MilvusServerResource(name, apiKeyParameter); string? connectionString = null; + MilvusClient? milvusClient = null; builder.Eventing.Subscribe(milvus, async (@event, ct) => { @@ -63,7 +67,7 @@ public static IResourceBuilder AddMilvus(this IDistributed { throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null."); } - + milvusClient = CreateMilvusClient(@event.Services, connectionString); var lookup = builder.Resources.OfType().ToDictionary(d => d.Name); foreach (var databaseName in milvus.Databases) { @@ -80,12 +84,15 @@ public static IResourceBuilder AddMilvus(this IDistributed }); var healthCheckKey = $"{name}_check"; - builder.Services.AddHealthChecks().AddAsyncCheck(healthCheckKey, async () => - { - // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: - // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 - return await HealthCheck(connectionString).ConfigureAwait(false); - }); + // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: + // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 + builder.Services.AddHealthChecks() + .Add(new HealthCheckRegistration( + healthCheckKey, + sp => new MilvusHealthCheck(milvusClient!), + failureStatus: default, + tags: default, + timeout: default)); return builder.AddResource(milvus) .WithImage(MilvusContainerImageTags.Image, MilvusContainerImageTags.Tag) @@ -141,7 +148,7 @@ public static IResourceBuilder AddDatabase(this IResourc var milvusDatabaseResource = new MilvusDatabaseResource(name, databaseName, builder.Resource); string? connectionString = null; - + MilvusClient? milvusClient = null; builder.ApplicationBuilder.Eventing.Subscribe(milvusDatabaseResource, async (@event, ct) => { connectionString = await milvusDatabaseResource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false); @@ -149,15 +156,19 @@ public static IResourceBuilder AddDatabase(this IResourc { throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvusDatabaseResource.Name}' resource but the connection string was null."); } + milvusClient = CreateMilvusClient(@event.Services, connectionString); }); var healthCheckKey = $"{name}_check"; - builder.ApplicationBuilder.Services.AddHealthChecks().AddAsyncCheck(healthCheckKey, async () => - { - // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: - // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 - return await HealthCheck(connectionString).ConfigureAwait(false); - }); + // TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue: + // https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214 + builder.ApplicationBuilder.Services.AddHealthChecks() + .Add(new HealthCheckRegistration( + healthCheckKey, + sp => new MilvusHealthCheck(milvusClient!), + failureStatus: default, + tags: default, + timeout: default)); return builder.ApplicationBuilder.AddResource(milvusDatabaseResource) .WithHealthCheck(healthCheckKey); @@ -248,27 +259,44 @@ private static void ConfigureAttuContainer(EnvironmentCallbackContext context, M // This will need to be refactored once updated service discovery APIs are available context.EnvironmentVariables.Add("MILVUS_URL", $"{resource.PrimaryEndpoint.Scheme}://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}"); } - - private static async Task HealthCheck(string? connectionString) + internal static MilvusClient CreateMilvusClient(IServiceProvider sp, string? connectionString) { if (connectionString is null) { throw new InvalidOperationException("Connection string is unavailable"); } - try - { - var milvusClient = new MilvusClient(connectionString); - - var milvusHealthState = await milvusClient.HealthAsync().ConfigureAwait(false); + Uri? endpoint = null; + string? key = null; + string? database = null; - return milvusHealthState.IsHealthy - ? HealthCheckResult.Healthy() - : new HealthCheckResult(HealthStatus.Unhealthy, description: milvusHealthState.ToString()); + if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri)) + { + endpoint = uri; } - catch (Exception ex) + else { - return new HealthCheckResult(HealthStatus.Unhealthy, exception: ex); + var connectionBuilder = new DbConnectionStringBuilder + { + ConnectionString = connectionString + }; + + if (connectionBuilder.ContainsKey("Endpoint") && Uri.TryCreate(connectionBuilder["Endpoint"].ToString(), UriKind.Absolute, out var serviceUri)) + { + endpoint = serviceUri; + } + + if (connectionBuilder.ContainsKey("Key")) + { + key = connectionBuilder["Key"].ToString(); + } + + if (connectionBuilder.ContainsKey("Database")) + { + database = connectionBuilder["Database"].ToString(); + } } + + return new MilvusClient(endpoint!, apiKey: key!, database: database, loggerFactory: sp.GetRequiredService()); } } diff --git a/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj b/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj index bd11f4ab0f3..bff36d23b5e 100644 --- a/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj +++ b/tests/Aspire.Hosting.Milvus.Tests/Aspire.Hosting.Milvus.Tests.csproj @@ -14,8 +14,4 @@ - - - - diff --git a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs index f5062e58139..ad33dae5e7d 100644 --- a/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs +++ b/tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs @@ -311,7 +311,7 @@ public async Task VerifyWaitForOnMilvusDatabaseBlocksDependentResources() // Create the database. var connectionString = await resource.Resource.ConnectionStringExpression.GetValueAsync(cts.Token); - var milvusClient = new MilvusClient(connectionString!); + var milvusClient = MilvusBuilderExtensions.CreateMilvusClient(app.Services, connectionString); await milvusClient.CreateDatabaseAsync(db.Resource.Name); await rns.WaitForResourceAsync(db.Resource.Name, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cts.Token);