Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<PackageVersion Include="Azure.AI.OpenAI" Version="2.0.0" />
<PackageVersion Include="Azure.Data.Tables" Version="12.9.1" />
<PackageVersion Include="Azure.Extensions.AspNetCore.Configuration.Secrets" Version="1.3.2" />
<PackageVersion Include="Azure.Messaging.EventHubs" Version="5.11.5" />
<PackageVersion Include="Azure.Messaging.EventHubs.Processor" Version="5.11.5" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageVersion Include="Azure.Search.Documents" Version="11.6.0" />
Expand Down Expand Up @@ -72,6 +73,7 @@
<!-- AspNetCore.HealthChecks dependencies (3rd party packages) -->
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Data.Tables" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.Azure.KeyVault.Secrets" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Messaging.EventHubs" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Blobs" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.Azure.Storage.Queues" Version="8.0.1" />
<PackageVersion Include="AspNetCore.HealthChecks.AzureServiceBus" Version="8.0.1" />
Expand Down
4 changes: 2 additions & 2 deletions playground/AspireEventHub/EventHubs.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
.AddEventHub("hub");

builder.AddProject<Projects.EventHubsConsumer>("consumer")
.WithReference(eventHub)
.WithReference(eventHub).WaitFor(eventHub)
.WithReference(blob);

builder.AddProject<Projects.EventHubsApi>("api")
.WithExternalHttpEndpoints()
.WithReference(eventHub);
.WithReference(eventHub).WaitFor(eventHub);

builder.Build().Run();
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
<ProjectReference Include="..\Aspire.Hosting.Azure\Aspire.Hosting.Azure.csproj" />
<PackageReference Include="Azure.Provisioning" />
<PackageReference Include="Azure.Provisioning.EventHubs" />
<PackageReference Include="Azure.Messaging.EventHubs" />
<PackageReference Include="AspNetCore.HealthChecks.Azure.Messaging.EventHubs" />
</ItemGroup>

</Project>
31 changes: 31 additions & 0 deletions src/Aspire.Hosting.Azure.EventHubs/AzureEventHubsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
using Aspire.Hosting.Azure;
using Aspire.Hosting.Azure.EventHubs;
using Aspire.Hosting.Utils;
using Azure.Messaging.EventHubs.Producer;
using Azure.Provisioning;
using Azure.Provisioning.EventHubs;
using Azure.Provisioning.Expressions;
using Microsoft.Extensions.DependencyInjection;

namespace Aspire.Hosting;

Expand Down Expand Up @@ -187,6 +189,35 @@ public static IResourceBuilder<AzureEventHubsResource> RunAsEmulator(this IResou
context.EnvironmentVariables.Add("METADATA_SERVER", $"{tableEndpoint.Resource.Name}:{tableEndpoint.TargetPort}");
}));

EventHubProducerClient? client = null;

builder.ApplicationBuilder.Eventing.Subscribe<ConnectionStringAvailableEvent>(builder.Resource, async (@event, ct) =>
{
var connectionString = await builder.Resource.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false)
?? throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{builder.Resource.Name}' resource but the connection string was null.");

// For the purposes of the health check we only need to know a hub name. If we don't have a hub
// name we can't configure a valid producer client connection so we should throw. What good is
// an event hub namespace without an event hub? :)
if (builder.Resource.Hubs is { Count: > 0 } && builder.Resource.Hubs[0] is { } hub)
{
var healthCheckConnectionString = $"{connectionString};EntityPath={hub.Name};";
client = new EventHubProducerClient(healthCheckConnectionString);
}
else
{
throw new DistributedApplicationException($"The '{builder.Resource.Name}' resource does not have any Event Hubs.");
}
});

var healthCheckKey = $"{builder.Resource.Name}_check";
builder.ApplicationBuilder.Services.AddHealthChecks().AddAzureEventHub(
sp => client ?? throw new DistributedApplicationException("EventHubProducerClient is not initialized"),
healthCheckKey
);

builder.WithHealthCheck(healthCheckKey);

if (configureContainer != null)
{
var surrogate = new AzureEventHubsEmulatorResource(builder.Resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,57 @@
using Aspire.Hosting.Azure.EventHubs;
using Xunit;
using Aspire.Hosting.ApplicationModel;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Aspire.Components.Common.Tests;
using Xunit.Abstractions;

namespace Aspire.Hosting.Azure.Tests;

public class AzureEventHubsExtensionsTests
public class AzureEventHubsExtensionsTests(ITestOutputHelper testOutputHelper)
{
[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnEventHubsEmulatorBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var resource = builder.AddAzureEventHubs("resource")
.AddEventHub("hubx")
.RunAsEmulator()
.WithHealthCheck("blocking_check");

var dependentResource = builder.AddContainer("nginx", "mcr.microsoft.com/cbl-mariner/base/nginx", "1.22")
.WaitFor(resource);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceHealthyAsync(resource.Resource.Name, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}

[Fact]
public void AzureEventHubsUseEmulatorCallbackWithWithDataBindMountResultsInBindMountAnnotationWithDefaultPath()
{
Expand Down