Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions playground/kafka/KafkaBasic.AppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
.WithKafkaUI(kafkaUi => kafkaUi.WithHostPort(8080));

builder.AddProject<Projects.Producer>("producer")
.WithReference(kafka)
.WithReference(kafka).WaitFor(kafka)
.WithArgs(kafka.Resource.Name);

builder.AddProject<Projects.Consumer>("consumer")
.WithReference(kafka)
.WithReference(kafka).WaitFor(kafka)
.WithArgs(kafka.Resource.Name);

builder.AddKafka("kafka2").WithKafkaUI();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
<ProjectReference Include="..\WaitForSandbox.Common\WaitForSandbox.Common.csproj" />
</ItemGroup>

</Project>
</Project>
14 changes: 10 additions & 4 deletions src/Aspire.Hosting.Kafka/Aspire.Hosting.Kafka.csproj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
<IsPackable>true</IsPackable>
<PackageTags>aspire integration hosting kafka</PackageTags>
<Description>Kafka support for .NET Aspire.</Description>
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
<IsPackable>true</IsPackable>
<PackageTags>aspire integration hosting kafka</PackageTags>
<Description>Kafka support for .NET Aspire.</Description>
</PropertyGroup>

<PropertyGroup>
Expand All @@ -14,6 +14,12 @@
<ItemGroup>
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
</ItemGroup>
<ItemGroup>
</ItemGroup>

<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.Kafka" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Aspire.Hosting\Aspire.Hosting.csproj" />
Expand Down
44 changes: 43 additions & 1 deletion src/Aspire.Hosting.Kafka/KafkaBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using HealthChecks.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace Aspire.Hosting;

Expand All @@ -29,12 +33,50 @@ public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedAp
ArgumentNullException.ThrowIfNull(name);

var kafka = new KafkaServerResource(name);

string? connectionString = null;

builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(kafka, async (@event, ct) =>
{
connectionString = await kafka.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);

if (connectionString == null)
{
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{kafka.Name}' resource but the connection string was null.");
}
});

var healthCheckKey = $"{name}_check";

// NOTE: We cannot use AddKafka here because it registers the health check as a singleton
// which means if you have multiple Kafka resources the factory callback will end
// up using the connection string of the last Kafka resource that was added. The
// client packages also have to work around this issue.
//
// SEE: https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2298
var healthCheckRegistration = new HealthCheckRegistration(
healthCheckKey,
sp =>
{
var options = new KafkaHealthCheckOptions();
options.Configuration = new ProducerConfig();
options.Configuration.BootstrapServers = connectionString ?? throw new InvalidOperationException("Connection string is unavailable");
options.Configuration.SocketTimeoutMs = 1000;
options.Configuration.MessageTimeoutMs = 1000;
options.Configuration.StatisticsIntervalMs = 0;
return new KafkaHealthCheck(options);
},
failureStatus: default,
tags: default);
builder.Services.AddHealthChecks().Add(healthCheckRegistration);

return builder.AddResource(kafka)
.WithEndpoint(targetPort: KafkaBrokerPort, port: port, name: KafkaServerResource.PrimaryEndpointName)
.WithEndpoint(targetPort: KafkaInternalBrokerPort, name: KafkaServerResource.InternalEndpointName)
.WithImage(KafkaContainerImageTags.Image, KafkaContainerImageTags.Tag)
.WithImageRegistry(KafkaContainerImageTags.Registry)
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka))
.WithHealthCheck(healthCheckKey);
}

/// <summary>
Expand Down
42 changes: 42 additions & 0 deletions tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Components.Common.Tests;
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Tests.Utils;
using Aspire.Hosting.Utils;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Polly;
using Xunit;
Expand All @@ -15,6 +17,46 @@ namespace Aspire.Hosting.Kafka.Tests;

public class KafkaFunctionalTests(ITestOutputHelper testOutputHelper)
{
[Fact]
[RequiresDocker]
public async Task VerifyWaitForOnKafkaBlocksDependentResources()
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper);

var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
{
return healthCheckTcs.Task;
});

var resource = builder.AddKafka("resource")
.WithHealthCheck("blocking_check");

var dependentResource = builder.AddKafka("dependentresource")
.WaitFor(resource);

using var app = builder.Build();

var pendingStart = app.StartAsync(cts.Token);

var rns = app.Services.GetRequiredService<ResourceNotificationService>();

await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);

healthCheckTcs.SetResult(HealthCheckResult.Healthy());

await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);

await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);

await pendingStart;

await app.StopAsync();
}

[Fact]
[RequiresDocker]
public async Task VerifyKafkaResource()
Expand Down