Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<LangVersion>preview</LangVersion>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<SharedDir>$(MSBuildThisFileDirectory)/src/Shared/</SharedDir>
<ComponentsDir>$(MSBuildThisFileDirectory)/src/Components/</ComponentsDir>
<TestsSharedDir>$([MSBuild]::NormalizeDirectory($(MSBuildThisFileDirectory), 'tests', 'Shared'))</TestsSharedDir>
<TestsSharedRepoTestingDir>$([MSBuild]::NormalizeDirectory($(TestsSharedDir), 'RepoTesting'))</TestsSharedRepoTestingDir>
<VendoringDir>$(MSBuildThisFileDirectory)/src/Vendoring/</VendoringDir>
Expand Down
12 changes: 11 additions & 1 deletion src/Aspire.Hosting.Milvus/Aspire.Hosting.Milvus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
<PackageTags>aspire integration hosting milvus database vector search</PackageTags>
<Description>Milvus vector database support for .NET Aspire.</Description>
<PackageIconFullPath>$(SharedDir)Milvus_256x.png</PackageIconFullPath>
<NoWarn>$(NoWarn);CS8002</NoWarn><!-- Milvus.Client packages are not signed -->

<!-- Disable package validation as this package hasn't shipped yet. -->
<EnablePackageValidation>false</EnablePackageValidation>
</PropertyGroup>
Expand All @@ -17,10 +19,18 @@
<ItemGroup>
<Compile Include="$(SharedDir)StringComparers.cs" Link="Utils\StringComparers.cs" />
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
<Compile Include="$(ComponentsDir)Aspire.Milvus.Client\MilvusHealthCheck.cs" Link="MilvusHealthCheck.cs"></Compile>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Aspire.Hosting\Aspire.Hosting.csproj" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Aspire.Hosting.Milvus.Tests"></InternalsVisibleTo>
</ItemGroup>

<ItemGroup>
<PackageReference Include="Milvus.Client" />
</ItemGroup>
</Project>
115 changes: 112 additions & 3 deletions src/Aspire.Hosting.Milvus/MilvusBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Data.Common;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Milvus;
using Aspire.Hosting.Utils;
using Aspire.Milvus.Client;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Milvus.Client;

namespace Aspire.Hosting;

Expand Down Expand Up @@ -51,6 +57,43 @@ public static IResourceBuilder<MilvusServerResource> AddMilvus(this IDistributed

var milvus = new MilvusServerResource(name, apiKeyParameter);

string? connectionString = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably technically don't need the connection string here since you have the client.

MilvusClient? milvusClient = null;

builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(milvus, async (@event, ct) =>
{
connectionString = await milvus.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
if (connectionString is null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvus.Name}' resource but the connection string was null.");
}
milvusClient = CreateMilvusClient(@event.Services, connectionString);
var lookup = builder.Resources.OfType<MilvusDatabaseResource>().ToDictionary(d => d.Name);
foreach (var databaseName in milvus.Databases)
{
if (!lookup.TryGetValue(databaseName.Key, out var databaseResource))
{
throw new DistributedApplicationException($"Database resource '{databaseName}' under Milvus Server resource '{milvus.Name}' was not found in the model.");
}
var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(databaseResource, @event.Services);
await builder.Eventing.PublishAsync<ConnectionStringAvailableEvent>(connectionStringAvailableEvent, ct).ConfigureAwait(false);

var beforeResourceStartedEvent = new BeforeResourceStartedEvent(databaseResource, @event.Services);
await builder.Eventing.PublishAsync(beforeResourceStartedEvent, ct).ConfigureAwait(false);
}
});

var healthCheckKey = $"{name}_check";
// TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue:
// https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214
builder.Services.AddHealthChecks()
.Add(new HealthCheckRegistration(
healthCheckKey,
sp => new MilvusHealthCheck(milvusClient!),
failureStatus: default,
tags: default,
timeout: default));

return builder.AddResource(milvus)
.WithImage(MilvusContainerImageTags.Image, MilvusContainerImageTags.Tag)
.WithImageRegistry(MilvusContainerImageTags.Registry)
Expand All @@ -67,7 +110,8 @@ public static IResourceBuilder<MilvusServerResource> AddMilvus(this IDistributed
{
ctx.EnvironmentVariables["COMMON_SECURITY_DEFAULTROOTPASSWORD"] = milvus.ApiKeyParameter;
})
.WithArgs("milvus", "run", "standalone");
.WithArgs("milvus", "run", "standalone")
.WithHealthCheck(healthCheckKey);
}

/// <summary>
Expand Down Expand Up @@ -101,8 +145,33 @@ public static IResourceBuilder<MilvusDatabaseResource> AddDatabase(this IResourc
databaseName ??= name;

builder.Resource.AddDatabase(name, databaseName);
var milvusResource = new MilvusDatabaseResource(name, databaseName, builder.Resource);
return builder.ApplicationBuilder.AddResource(milvusResource);
var milvusDatabaseResource = new MilvusDatabaseResource(name, databaseName, builder.Resource);

string? connectionString = null;
MilvusClient? milvusClient = null;
builder.ApplicationBuilder.Eventing.Subscribe<ConnectionStringAvailableEvent>(milvusDatabaseResource, async (@event, ct) =>
{
connectionString = await milvusDatabaseResource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{milvusDatabaseResource.Name}' resource but the connection string was null.");
}
milvusClient = CreateMilvusClient(@event.Services, connectionString);
});

var healthCheckKey = $"{name}_check";
// TODO: Use health check from AspNetCore.Diagnostics.HealthChecks once it's implemented via this issue:
// https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2214
builder.ApplicationBuilder.Services.AddHealthChecks()
.Add(new HealthCheckRegistration(
healthCheckKey,
sp => new MilvusHealthCheck(milvusClient!),
failureStatus: default,
tags: default,
timeout: default));

return builder.ApplicationBuilder.AddResource(milvusDatabaseResource)
.WithHealthCheck(healthCheckKey);
}

/// <summary>
Expand Down Expand Up @@ -190,4 +259,44 @@ private static void ConfigureAttuContainer(EnvironmentCallbackContext context, M
// This will need to be refactored once updated service discovery APIs are available
context.EnvironmentVariables.Add("MILVUS_URL", $"{resource.PrimaryEndpoint.Scheme}://{resource.Name}:{resource.PrimaryEndpoint.TargetPort}");
}
internal static MilvusClient CreateMilvusClient(IServiceProvider sp, string? connectionString)
{
if (connectionString is null)
Copy link
Member

@davidfowl davidfowl Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we would share this logic:

public sealed class MilvusClientSettings

@eerhardt thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @davidfowl we should do this because the created Milvus connection string in app-host isn't compatible with MilvusClient and those properties (database, endpoint, key) should be extracted before creating MilvusClient.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there a reason this wasn't done?

{
throw new InvalidOperationException("Connection string is unavailable");
}

Uri? endpoint = null;
string? key = null;
string? database = null;

if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri))
{
endpoint = uri;
}
else
{
var connectionBuilder = new DbConnectionStringBuilder
{
ConnectionString = connectionString
};

if (connectionBuilder.ContainsKey("Endpoint") && Uri.TryCreate(connectionBuilder["Endpoint"].ToString(), UriKind.Absolute, out var serviceUri))
{
endpoint = serviceUri;
}

if (connectionBuilder.ContainsKey("Key"))
{
key = connectionBuilder["Key"].ToString();
}

if (connectionBuilder.ContainsKey("Database"))
{
database = connectionBuilder["Database"].ToString();
}
}

return new MilvusClient(endpoint!, apiKey: key!, database: database, loggerFactory: sp.GetRequiredService<ILoggerFactory>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,4 @@
<PackageReference Include="Microsoft.Extensions.Http.Resilience" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(RepoRoot)src\Aspire.Hosting.Milvus\MilvusContainerImageTags.cs" />
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
</ItemGroup>
</Project>
94 changes: 94 additions & 0 deletions tests/Aspire.Hosting.Milvus.Tests/MilvusFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.Common.Tests;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Grpc.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Milvus.Client;
using Polly;
Expand Down Expand Up @@ -229,4 +231,96 @@ await pipeline.ExecuteAsync(
}
}
}

[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnMilvusBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var resource = builder.AddMilvus("resource")
.WithHealthCheck("blocking_check");

var dependentResource = builder.AddMilvus("dependentresource")
.WaitFor(resource);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}

[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnMilvusDatabaseBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
using var builder = TestDistributedApplicationBuilder.CreateWithTestContainerRegistry(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var resource = builder.AddMilvus("resource")
.WithHealthCheck("blocking_check");

var db = resource.AddDatabase("db");

var dependentResource = builder.AddMilvus("dependentresource")
.WaitFor(db);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(db.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);

// Create the database.
var connectionString = await resource.Resource.ConnectionStringExpression.GetValueAsync(cts.Token);
var milvusClient = MilvusBuilderExtensions.CreateMilvusClient(app.Services, connectionString);
await milvusClient.CreateDatabaseAsync(db.Resource.Name);

await rns.WaitForResourceAsync(db.Resource.Name, re => re.Snapshot.HealthStatus == HealthStatus.Healthy, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}

}