Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

<ItemGroup>
<AspireProjectOrPackageReference Include="Aspire.Azure.Storage.Blobs" />
<AspireProjectOrPackageReference Include="Aspire.Azure.Storage.Queues" />
<ProjectReference Include="..\..\Playground.ServiceDefaults\Playground.ServiceDefaults.csproj" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@
// The .NET Foundation licenses this file to you under the MIT license.

using Azure.Storage.Blobs;
using Azure.Storage.Queues;

var builder = WebApplication.CreateBuilder(args);

builder.AddServiceDefaults();

builder.AddAzureBlobClient("blobs");
builder.AddAzureQueueClient("queues");

var app = builder.Build();

app.MapDefaultEndpoints();
app.MapGet("/", async (BlobServiceClient bsc) =>
app.MapGet("/", async (BlobServiceClient bsc, QueueServiceClient qsc) =>
{
var container = bsc.GetBlobContainerClient("mycontainer");
await container.CreateIfNotExistsAsync();
Expand All @@ -29,6 +31,10 @@
blobNames.Add(blob.Name);
}

var queue = qsc.GetQueueClient("myqueue");
await queue.CreateIfNotExistsAsync();
await queue.SendMessageAsync("Hello, world!");

return blobNames;
});

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

var builder = DistributedApplication.CreateBuilder(args);

var storage = builder.AddAzureStorage("storage").RunAsEmulator(container =>
Expand All @@ -8,10 +9,12 @@
});

var blobs = storage.AddBlobs("blobs");
var queues = storage.AddQueues("queues");

builder.AddProject<Projects.AzureStorageEndToEnd_ApiService>("api")
.WithExternalHttpEndpoints()
.WithReference(blobs);
.WithReference(blobs).WaitFor(blobs)
.WithReference(queues).WaitFor(queues);

#if !SKIP_DASHBOARD_REFERENCE
// This project is only added in playground projects to support development/debugging
Expand Down
2 changes: 1 addition & 1 deletion playground/mongo/Mongo.ApiService/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
var builder = WebApplication.CreateBuilder(args);

builder.AddServiceDefaults();
builder.AddMongoDBClient("mongo");
builder.AddMongoDBClient("db");

var app = builder.Build();

Expand Down
3 changes: 2 additions & 1 deletion playground/mongo/Mongo.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

var db = builder.AddMongoDB("mongo")
.WithMongoExpress(c => c.WithHostPort(3022))
.PublishAsContainer();
.PublishAsContainer()
.AddDatabase("db");

builder.AddProject<Projects.Mongo_ApiService>("api")
.WithExternalHttpEndpoints()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Azure.Storage.Blobs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Aspire.Hosting.Azure\Aspire.Hosting.Azure.csproj" />
<PackageReference Include="Azure.Provisioning" />
Expand Down
71 changes: 70 additions & 1 deletion src/Aspire.Hosting.Azure.Storage/AzureStorageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
using Aspire.Hosting.Azure;
using Aspire.Hosting.Azure.Storage;
using Aspire.Hosting.Utils;
using Azure.Identity;
using Azure.Provisioning;
using Azure.Provisioning.Storage;
using Azure.Storage.Blobs;
using Microsoft.Extensions.DependencyInjection;
//using Microsoft.Extensions.DependencyInjection;

namespace Aspire.Hosting;

Expand Down Expand Up @@ -85,11 +89,58 @@ public static IResourceBuilder<AzureStorageResource> AddAzureStorage(this IDistr
};
var resource = new AzureStorageResource(name, configureConstruct);

var lockObject = new object();
BlobServiceClient? blobServiceClient = null;

builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(resource, async (@event, ct) =>
{
// HACK: AzureStorageResource does not implement IResourceWithConnectionString which means that
// we don't fire off a ConnectionStringAvailableEvent for it. To work around this the blob,
// table, and queue resources each propogate the event to the parent (the eventing system
// doesn't care about connection strings. For the actual health check we just use the
// blob service client in the parent rather than each of the child resources and rely
// on the resource health check service to propogate the health checks.
//
// TODO: There is potential race condition here to solve.
if (blobServiceClient != null)
{
return;
}

var connectionString = await resource.GetBlobConnectionString().GetValueAsync(ct).ConfigureAwait(false);

if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{resource.Name}' resource but the connection string was null.");
}

blobServiceClient = CreateBlobServiceClient(connectionString);
});

var healthCheckKey = $"{name}_check";
builder.Services.AddHealthChecks().AddAzureBlobStorage(sp =>
{
return blobServiceClient ?? throw new InvalidOperationException("BlobServiceClient is not initialized.");
}, name: healthCheckKey);

return builder.AddResource(resource)
// These ambient parameters are only available in development time.
.WithParameter(AzureBicepResource.KnownParameters.PrincipalId)
.WithParameter(AzureBicepResource.KnownParameters.PrincipalType)
.WithManifestPublishingCallback(resource.WriteToManifest);
.WithManifestPublishingCallback(resource.WriteToManifest)
.WithHealthCheck(healthCheckKey);

static BlobServiceClient CreateBlobServiceClient(string connectionString)
{
if (Uri.TryCreate(connectionString, UriKind.Absolute, out var uri))
{
return new BlobServiceClient(uri, new DefaultAzureCredential());
}
else
{
return new BlobServiceClient(connectionString);
}
}
}

/// <summary>
Expand Down Expand Up @@ -197,6 +248,12 @@ public static IResourceBuilder<AzureBlobStorageResource> AddBlobs(this IResource
{
var resource = new AzureBlobStorageResource(name, builder.Resource);

builder.ApplicationBuilder.Eventing.Subscribe<ConnectionStringAvailableEvent>(resource, async (@event, ct) =>
{
var propogatedEvent = new ConnectionStringAvailableEvent(resource.Parent, @event.Services);
await builder.ApplicationBuilder.Eventing.PublishAsync(propogatedEvent, ct).ConfigureAwait(false);
});

return builder.ApplicationBuilder.AddResource(resource);
}

Expand All @@ -210,6 +267,12 @@ public static IResourceBuilder<AzureTableStorageResource> AddTables(this IResour
{
var resource = new AzureTableStorageResource(name, builder.Resource);

builder.ApplicationBuilder.Eventing.Subscribe<ConnectionStringAvailableEvent>(resource, async (@event, ct) =>
{
var propogatedEvent = new ConnectionStringAvailableEvent(resource.Parent, @event.Services);
await builder.ApplicationBuilder.Eventing.PublishAsync(propogatedEvent, ct).ConfigureAwait(false);
});

return builder.ApplicationBuilder.AddResource(resource);
}

Expand All @@ -223,6 +286,12 @@ public static IResourceBuilder<AzureQueueStorageResource> AddQueues(this IResour
{
var resource = new AzureQueueStorageResource(name, builder.Resource);

builder.ApplicationBuilder.Eventing.Subscribe<ConnectionStringAvailableEvent>(resource, async (@event, ct) =>
{
var propogatedEvent = new ConnectionStringAvailableEvent(resource.Parent, @event.Services);
await builder.ApplicationBuilder.Eventing.PublishAsync(propogatedEvent, ct).ConfigureAwait(false);
});

return builder.ApplicationBuilder.AddResource(resource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ IDistributedApplicationEventing eventing
return azureResources;
}

private ILookup<IResource, IResourceWithParent>? _parentChildLookup;

public async Task BeforeStartAsync(DistributedApplicationModel appModel, CancellationToken cancellationToken = default)
{
var azureResources = GetAzureResourcesFromAppModel(appModel);
Expand All @@ -92,8 +94,15 @@ public async Task BeforeStartAsync(DistributedApplicationModel appModel, Cancell
return;
}

// Create a map of parents to their children used to propagate state changes later.
var parentChildLookup = appModel.Resources.OfType<IResourceWithParent>().ToLookup(r => r.Parent);
static IResource? SelectParentResource(IResource resource) => resource switch
{
IAzureResource ar => ar,
IResourceWithParent rp => SelectParentResource(rp.Parent),
_ => null
};

// Create a map of parents to their children used to propogate state changes later.
_parentChildLookup = appModel.Resources.OfType<IResourceWithParent>().ToLookup(r => r.Parent);

// Sets the state of the resource and all of its children
async Task UpdateStateAsync((IResource Resource, IAzureResource AzureResource) resource, Func<CustomResourceSnapshot, CustomResourceSnapshot> stateFactory)
Expand All @@ -110,7 +119,7 @@ async Task UpdateStateAsync((IResource Resource, IAzureResource AzureResource) r

// We basically want child resources to be moved into the same state as their parent resources whenever
// there is a state update. This is done for us in DCP so we replicate the behavior here in the Azure Provisioner.
var childResources = parentChildLookup[resource.Resource];
var childResources = _parentChildLookup[resource.Resource];
foreach (var child in childResources)
{
await notificationService.PublishUpdateAsync(child, stateFactory).ConfigureAwait(false);
Expand Down Expand Up @@ -327,6 +336,15 @@ async Task PublishConnectionStringAvailableEventAsync()
{
var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(resource.Resource, serviceProvider);
await eventing.PublishAsync(connectionStringAvailableEvent, cancellationToken).ConfigureAwait(false);

if (_parentChildLookup![resource.Resource] is { } children)
{
foreach (var child in children.OfType<IResourceWithConnectionString>())
{
var childConnectionStringAvailableEvent = new ConnectionStringAvailableEvent(child, serviceProvider);
await eventing.PublishAsync(childConnectionStringAvailableEvent, cancellationToken).ConfigureAwait(false);
}
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/Aspire.Hosting.MongoDB/MongoDBBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ public static IResourceBuilder<MongoDBDatabaseResource> AddDatabase(this IResour
builder.Resource.AddDatabase(name, databaseName);
var mongoDBDatabase = new MongoDBDatabaseResource(name, databaseName, builder.Resource);

string? connectionString = null;

builder.ApplicationBuilder.Eventing.Subscribe<ConnectionStringAvailableEvent>(mongoDBDatabase, async (@event, ct) =>
{
connectionString = await mongoDBDatabase.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);

if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{mongoDBDatabase.Name}' resource but the connection string was null.");
}
});

var healthCheckKey = $"{name}_check";
builder.ApplicationBuilder.Services.AddHealthChecks().AddMongoDb(sp => connectionString ?? throw new InvalidOperationException("Connection string is unavailable"), name: healthCheckKey);

return builder.ApplicationBuilder
.AddResource(mongoDBDatabase);
}
Expand Down
29 changes: 21 additions & 8 deletions src/Aspire.Hosting/Dcp/ApplicationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,14 +1251,31 @@ await notificationService.PublishUpdateAsync(cr.ModelResource, s => s with
}
}

private async Task CreateExecutableAsync(AppResource er, ILogger resourceLogger, CancellationToken cancellationToken)
private async Task PublishConnectionStringAvailableEvent(IResource resource, CancellationToken cancellationToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this still?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we still have resources that depend on it (Postgres/Redis etc).

{
if (er.ModelResource is IResourceWithConnectionString)
// If the resource itself has a connection string then publish that the connection string is available.
if (resource is IResourceWithConnectionString)
{
var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(er.ModelResource, serviceProvider);
var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(resource, serviceProvider);
await eventing.PublishAsync(connectionStringAvailableEvent, cancellationToken).ConfigureAwait(false);
}

// Sometimes the container/executable itself does not have a connection string, and in those cases
// we need to dispatch the event for the children.
if (_parentChildLookup[resource] is { } children)
{
foreach (var child in children.OfType<IResourceWithConnectionString>())
{
var childConnectionStringAvailableEvent = new ConnectionStringAvailableEvent(child, serviceProvider);
await eventing.PublishAsync(childConnectionStringAvailableEvent, cancellationToken).ConfigureAwait(false);
}
}
}

private async Task CreateExecutableAsync(AppResource er, ILogger resourceLogger, CancellationToken cancellationToken)
{
await PublishConnectionStringAvailableEvent(er.ModelResource, cancellationToken).ConfigureAwait(false);

var beforeResourceStartedEvent = new BeforeResourceStartedEvent(er.ModelResource, serviceProvider);
await eventing.PublishAsync(beforeResourceStartedEvent, cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -1550,11 +1567,7 @@ await notificationService.PublishUpdateAsync(cr.ModelResource, s => s with

private async Task CreateContainerAsync(AppResource cr, ILogger resourceLogger, CancellationToken cancellationToken)
{
if (cr.ModelResource is IResourceWithConnectionString)
{
var connectionStringAvailableEvent = new ConnectionStringAvailableEvent(cr.ModelResource, serviceProvider);
await eventing.PublishAsync(connectionStringAvailableEvent, cancellationToken).ConfigureAwait(false);
}
await PublishConnectionStringAvailableEvent(cr.ModelResource, cancellationToken).ConfigureAwait(false);

var beforeResourceStartedEvent = new BeforeResourceStartedEvent(cr.ModelResource, serviceProvider);
await eventing.PublishAsync(beforeResourceStartedEvent, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.Common.Tests;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Azure.Storage.Blobs;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Xunit;
using Xunit.Abstractions;
Expand All @@ -13,6 +15,56 @@ namespace Aspire.Hosting.Azure.Tests;

public class AzureStorageEmulatorFunctionalTests(ITestOutputHelper testOutputHelper)
{
[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnAzureStorageEmulatorForBlobsBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var storage = builder.AddAzureStorage("resource")
.RunAsEmulator()
.WithHealthCheck("blocking_check");

var blobs = storage.AddBlobs("blobs");
var queues = storage.AddQueues("queues");
var tables = storage.AddTables("tables");

var dependentResource = builder.AddAzureCosmosDB("dependentresource")
.RunAsEmulator()
.WaitFor(blobs)
.WaitFor(queues)
.WaitFor(tables);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(storage.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceHealthyAsync(blobs.Resource.Name, cts.Token);
await rns.WaitForResourceHealthyAsync(queues.Resource.Name, cts.Token);
await rns.WaitForResourceHealthyAsync(tables.Resource.Name, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}

[Fact]
[RequiresDocker]
public async Task VerifyAzureStorageEmulatorResource()
Expand Down