Skip to content

Commit 65c6902

Browse files
authored
WaitFor for Kafka (#5719)
1 parent 332a0cc commit 65c6902

File tree

5 files changed

+95
-8
lines changed

5 files changed

+95
-8
lines changed

playground/kafka/KafkaBasic.AppHost/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
.WithKafkaUI(kafkaUi => kafkaUi.WithHostPort(8080));
88

99
builder.AddProject<Projects.Producer>("producer")
10-
.WithReference(kafka)
10+
.WithReference(kafka).WaitFor(kafka)
1111
.WithArgs(kafka.Resource.Name);
1212

1313
builder.AddProject<Projects.Consumer>("consumer")
14-
.WithReference(kafka)
14+
.WithReference(kafka).WaitFor(kafka)
1515
.WithArgs(kafka.Resource.Name);
1616

1717
builder.AddKafka("kafka2").WithKafkaUI();

playground/waitfor/WaitForSandbox.ApiService/WaitForSandbox.ApiService.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@
1212
<ProjectReference Include="..\WaitForSandbox.Common\WaitForSandbox.Common.csproj" />
1313
</ItemGroup>
1414

15-
</Project>
15+
</Project>

src/Aspire.Hosting.Kafka/Aspire.Hosting.Kafka.csproj

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
5-
<IsPackable>true</IsPackable>
6-
<PackageTags>aspire integration hosting kafka</PackageTags>
7-
<Description>Kafka support for .NET Aspire.</Description>
4+
<TargetFramework>$(DefaultTargetFramework)</TargetFramework>
5+
<IsPackable>true</IsPackable>
6+
<PackageTags>aspire integration hosting kafka</PackageTags>
7+
<Description>Kafka support for .NET Aspire.</Description>
88
</PropertyGroup>
99

1010
<PropertyGroup>
@@ -14,6 +14,12 @@
1414
<ItemGroup>
1515
<Compile Include="$(SharedDir)VolumeNameGenerator.cs" Link="Utils\VolumeNameGenerator.cs" />
1616
</ItemGroup>
17+
<ItemGroup>
18+
</ItemGroup>
19+
20+
<ItemGroup>
21+
<PackageReference Include="AspNetCore.HealthChecks.Kafka" />
22+
</ItemGroup>
1723

1824
<ItemGroup>
1925
<ProjectReference Include="..\Aspire.Hosting\Aspire.Hosting.csproj" />

src/Aspire.Hosting.Kafka/KafkaBuilderExtensions.cs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33

44
using Aspire.Hosting.ApplicationModel;
55
using Aspire.Hosting.Utils;
6+
using Confluent.Kafka;
7+
using HealthChecks.Kafka;
8+
using Microsoft.Extensions.DependencyInjection;
9+
using Microsoft.Extensions.Diagnostics.HealthChecks;
610

711
namespace Aspire.Hosting;
812

@@ -29,12 +33,47 @@ public static IResourceBuilder<KafkaServerResource> AddKafka(this IDistributedAp
2933
ArgumentNullException.ThrowIfNull(name);
3034

3135
var kafka = new KafkaServerResource(name);
36+
37+
string? connectionString = null;
38+
39+
builder.Eventing.Subscribe<ConnectionStringAvailableEvent>(kafka, async (@event, ct) =>
40+
{
41+
connectionString = await kafka.ConnectionStringExpression.GetValueAsync(ct).ConfigureAwait(false);
42+
43+
if (connectionString == null)
44+
{
45+
throw new DistributedApplicationException($"ConnectionStringAvailableEvent was published for the '{kafka.Name}' resource but the connection string was null.");
46+
}
47+
});
48+
49+
var healthCheckKey = $"{name}_check";
50+
51+
// NOTE: We cannot use AddKafka here because it registers the health check as a singleton
52+
// which means if you have multiple Kafka resources the factory callback will end
53+
// up using the connection string of the last Kafka resource that was added. The
54+
// client packages also have to work around this issue.
55+
//
56+
// SEE: https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks/issues/2298
57+
var healthCheckRegistration = new HealthCheckRegistration(
58+
healthCheckKey,
59+
sp =>
60+
{
61+
var options = new KafkaHealthCheckOptions();
62+
options.Configuration = new ProducerConfig();
63+
options.Configuration.BootstrapServers = connectionString ?? throw new InvalidOperationException("Connection string is unavailable");
64+
return new KafkaHealthCheck(options);
65+
},
66+
failureStatus: default,
67+
tags: default);
68+
builder.Services.AddHealthChecks().Add(healthCheckRegistration);
69+
3270
return builder.AddResource(kafka)
3371
.WithEndpoint(targetPort: KafkaBrokerPort, port: port, name: KafkaServerResource.PrimaryEndpointName)
3472
.WithEndpoint(targetPort: KafkaInternalBrokerPort, name: KafkaServerResource.InternalEndpointName)
3573
.WithImage(KafkaContainerImageTags.Image, KafkaContainerImageTags.Tag)
3674
.WithImageRegistry(KafkaContainerImageTags.Registry)
37-
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka));
75+
.WithEnvironment(context => ConfigureKafkaContainer(context, kafka))
76+
.WithHealthCheck(healthCheckKey);
3877
}
3978

4079
/// <summary>

tests/Aspire.Hosting.Kafka.Tests/KafkaFunctionalTests.cs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using Aspire.Components.Common.Tests;
5+
using Aspire.Hosting.ApplicationModel;
56
using Aspire.Hosting.Tests.Utils;
67
using Aspire.Hosting.Utils;
78
using Confluent.Kafka;
89
using Microsoft.Extensions.DependencyInjection;
10+
using Microsoft.Extensions.Diagnostics.HealthChecks;
911
using Microsoft.Extensions.Hosting;
1012
using Polly;
1113
using Xunit;
@@ -15,6 +17,46 @@ namespace Aspire.Hosting.Kafka.Tests;
1517

1618
public class KafkaFunctionalTests(ITestOutputHelper testOutputHelper)
1719
{
20+
[Fact]
21+
[RequiresDocker]
22+
public async Task VerifyWaitForOnKafkaBlocksDependentResources()
23+
{
24+
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
25+
using var builder = TestDistributedApplicationBuilder.Create(testOutputHelper);
26+
27+
var healthCheckTcs = new TaskCompletionSource<HealthCheckResult>();
28+
builder.Services.AddHealthChecks().AddAsyncCheck("blocking_check", () =>
29+
{
30+
return healthCheckTcs.Task;
31+
});
32+
33+
var resource = builder.AddKafka("resource")
34+
.WithHealthCheck("blocking_check");
35+
36+
var dependentResource = builder.AddKafka("dependentresource")
37+
.WaitFor(resource);
38+
39+
using var app = builder.Build();
40+
41+
var pendingStart = app.StartAsync(cts.Token);
42+
43+
var rns = app.Services.GetRequiredService<ResourceNotificationService>();
44+
45+
await rns.WaitForResourceAsync(resource.Resource.Name, KnownResourceStates.Running, cts.Token);
46+
47+
await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Waiting, cts.Token);
48+
49+
healthCheckTcs.SetResult(HealthCheckResult.Healthy());
50+
51+
await rns.WaitForResourceAsync(resource.Resource.Name, (re => re.Snapshot.HealthStatus == HealthStatus.Healthy), cts.Token);
52+
53+
await rns.WaitForResourceAsync(dependentResource.Resource.Name, KnownResourceStates.Running, cts.Token);
54+
55+
await pendingStart;
56+
57+
await app.StopAsync();
58+
}
59+
1860
[Fact]
1961
[RequiresDocker]
2062
public async Task VerifyKafkaResource()

0 commit comments

Comments
 (0)