diff --git a/src/Dapr.Testcontainers/AssemblyInfo.cs b/src/Dapr.Testcontainers/AssemblyInfo.cs new file mode 100644 index 000000000..8601f0c07 --- /dev/null +++ b/src/Dapr.Testcontainers/AssemblyInfo.cs @@ -0,0 +1,16 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Dapr.Testcontainers.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b1f597635c44597fcecb493e2b1327033b29b1a98ac956a1a538664b68f87d45fbaada0438a15a6265e62864947cc067d8da3a7d93c5eb2fcbb850e396c8684dba74ea477d82a1bbb18932c0efb30b64ff1677f85ae833818707ac8b49ad8062ca01d2c89d8ab1843ae73e8ba9649cd28666b539444dcdee3639f95e2a099bb2")] diff --git a/src/Dapr.Testcontainers/Common/ContainerReadinessProbe.cs b/src/Dapr.Testcontainers/Common/ContainerReadinessProbe.cs new file mode 100644 index 000000000..d27a8c2bc --- /dev/null +++ b/src/Dapr.Testcontainers/Common/ContainerReadinessProbe.cs @@ -0,0 +1,214 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Net.Http; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; + +namespace Dapr.Testcontainers.Common; + +/// +/// Provides methods to poll for container readiness conditions such as TCP port +/// reachability and HTTP health endpoint availability. +/// +internal static class ContainerReadinessProbe +{ + /// + /// Polls the given TCP host/port until a connection can be established or the + /// timeout elapses. + /// + /// The host to connect to (e.g. "127.0.0.1"). + /// The TCP port to connect to. + /// Maximum time to wait before throwing . + /// Token used to cancel waiting. + /// Thrown when the port does not become reachable within . + internal static async Task WaitForTcpPortAsync( + string host, + int port, + TimeSpan timeout, + CancellationToken cancellationToken) + { + var start = DateTimeOffset.UtcNow; + Exception? lastError = null; + + while (DateTimeOffset.UtcNow - start < timeout) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + using var client = new TcpClient(); + var connectTask = client.ConnectAsync(host, port); + + var completed = await Task.WhenAny(connectTask, + Task.Delay(TimeSpan.FromMilliseconds(250), cancellationToken)); + if (completed == connectTask) + { + // Will throw if the connection failed + await connectTask; + return; + } + } + catch (Exception ex) when (ex is SocketException or InvalidOperationException) + { + lastError = ex; + } + + await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken); + } + + throw new TimeoutException($"Timed out waiting for TCP port {host}:{port} to accept connections.", lastError); + } + + /// + /// Polls the given HTTP until the HTTP server sends any + /// response — including error responses such as 5xx — or the timeout elapses. Only retries + /// when the underlying TCP connection is refused or the per-attempt timeout fires, meaning + /// the HTTP server is not yet listening. + /// + /// + /// Use this method when you need to verify that an HTTP server has started and is processing + /// requests without caring about application-level health status. For Dapr specifically, + /// /v1.0/healthz may return 500 while Dapr is still initializing components or while + /// a connected app has not yet started, but the server is already accepting and routing + /// requests. A single successful HTTP round-trip (regardless of status code) guarantees that + /// the HTTP and gRPC servers are both active, which eliminates the transient + /// "Connection refused" window that can occur immediately after the TCP port first opens. + /// + /// The URL to GET, e.g. "http://127.0.0.1:3500/v1.0/healthz". + /// Maximum total time to wait before throwing . + /// Token used to cancel waiting. + /// + /// Optional to use. When null a new instance is created and + /// disposed automatically. Supply a custom instance for testing purposes. + /// + /// Thrown when no HTTP response is received within . + internal static async Task WaitForHttpReachableAsync( + string url, + TimeSpan timeout, + CancellationToken cancellationToken, + HttpClient? httpClient = null) + { + var ownsClient = httpClient is null; + httpClient ??= new HttpClient(); + + try + { + var start = DateTimeOffset.UtcNow; + Exception? lastError = null; + + while (DateTimeOffset.UtcNow - start < timeout) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + // Bound each individual attempt so a stalled connection does not exhaust the overall timeout. + using var requestCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + requestCts.CancelAfter(TimeSpan.FromSeconds(5)); + + // Any HTTP response (including 5xx) means the server is accepting connections + // and actively processing requests. + await httpClient.GetAsync(url, requestCts.Token); + return; + } + catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException or OperationCanceledException) + { + if (cancellationToken.IsCancellationRequested) + throw; + + lastError = ex; + } + + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); + } + + throw new TimeoutException( + $"Timed out waiting for HTTP server at {url} to start accepting connections.", lastError); + } + finally + { + if (ownsClient) + httpClient.Dispose(); + } + } + + /// + /// Polls the given HTTP until a 2xx response is received or the + /// timeout elapses. Each individual HTTP attempt is bounded by a 5-second timeout to + /// avoid stalling when the endpoint is not yet accepting connections. + /// + /// The URL to GET, e.g. "http://127.0.0.1:3500/v1.0/healthz". + /// Maximum total time to wait before throwing . + /// Token used to cancel waiting. + /// + /// Optional to use. When null a new instance is created and + /// disposed automatically. Supply a custom instance for testing purposes. + /// + /// Thrown when the endpoint does not return a 2xx response within . + internal static async Task WaitForHttpHealthAsync( + string url, + TimeSpan timeout, + CancellationToken cancellationToken, + HttpClient? httpClient = null) + { + var ownsClient = httpClient is null; + httpClient ??= new HttpClient(); + + try + { + var start = DateTimeOffset.UtcNow; + Exception? lastError = null; + + while (DateTimeOffset.UtcNow - start < timeout) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + // Bound each individual attempt so a stalled connection does not exhaust the overall timeout. + using var requestCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + requestCts.CancelAfter(TimeSpan.FromSeconds(5)); + + var response = await httpClient.GetAsync(url, requestCts.Token); + var statusCode = (int)response.StatusCode; + if (statusCode >= 200 && statusCode < 300) + { + return; + } + + lastError = new HttpRequestException($"Health endpoint at {url} returned HTTP {statusCode}."); + } + catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException or OperationCanceledException) + { + if (cancellationToken.IsCancellationRequested) + throw; + + lastError = ex; + } + + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); + } + + throw new TimeoutException( + $"Timed out waiting for health endpoint {url} to return a successful response.", lastError); + } + finally + { + if (ownsClient) + httpClient.Dispose(); + } + } +} diff --git a/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs b/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs index 299d42604..d29d3b621 100644 --- a/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs +++ b/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs @@ -14,7 +14,6 @@ using System; using System.Collections.Generic; using System.Net; -using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Dapr.Testcontainers.Common; @@ -167,76 +166,44 @@ public DaprdContainer( /// public async Task StartAsync(CancellationToken cancellationToken = default) { - try - { - await _container.StartAsync(cancellationToken); - - var mappedHttpPort = _container.GetMappedPublicPort(InternalHttpPort); - var mappedGrpcPort = _container.GetMappedPublicPort(InternalGrpcPort); - - if (_requestedHttpPort is not null && mappedHttpPort != _requestedHttpPort.Value) - { - throw new InvalidOperationException( - $"Dapr HTTP port mapping mismatch. Requested {_requestedHttpPort.Value}, but Docker mapped {mappedHttpPort}"); - } + await _container.StartAsync(cancellationToken); - if (_requestedGrpcPort is not null && mappedGrpcPort != _requestedGrpcPort.Value) - { - throw new InvalidOperationException( - $"Dapr gRPC port mapping mismatch. Requested {_requestedGrpcPort.Value}, but Docker mapped {mappedGrpcPort}"); - } + var mappedHttpPort = _container.GetMappedPublicPort(InternalHttpPort); + var mappedGrpcPort = _container.GetMappedPublicPort(InternalGrpcPort); - HttpPort = mappedHttpPort; - GrpcPort = mappedGrpcPort; - - // The container log wait strategy can fire before the host port is actually accepting connections - // (especially on Windows). Ensure the ports are reachable from the test process. - await WaitForTcpPortAsync("127.0.0.1", HttpPort, TimeSpan.FromSeconds(30), cancellationToken); - await WaitForTcpPortAsync("127.0.0.1", GrpcPort, TimeSpan.FromSeconds(30), cancellationToken); - } - catch (Exception ex) + if (_requestedHttpPort is not null && mappedHttpPort != _requestedHttpPort.Value) { - var msg = ex.Message; - throw; + throw new InvalidOperationException( + $"Dapr HTTP port mapping mismatch. Requested {_requestedHttpPort.Value}, but Docker mapped {mappedHttpPort}"); } - } - private static async Task WaitForTcpPortAsync( - string host, - int port, - TimeSpan timeout, - CancellationToken cancellationToken) - { - var start = DateTimeOffset.UtcNow; - Exception? lastError = null; - - while (DateTimeOffset.UtcNow - start < timeout) + if (_requestedGrpcPort is not null && mappedGrpcPort != _requestedGrpcPort.Value) { - cancellationToken.ThrowIfCancellationRequested(); - - try - { - using var client = new TcpClient(); - var connectTask = client.ConnectAsync(host, port); - - var completed = await Task.WhenAny(connectTask, - Task.Delay(TimeSpan.FromMilliseconds(250), cancellationToken)); - if (completed == connectTask) - { - // Will throw if connect failed - await connectTask; - return; - } - } - catch (Exception ex) when (ex is SocketException or InvalidOperationException) - { - lastError = ex; - } - - await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken); + throw new InvalidOperationException( + $"Dapr gRPC port mapping mismatch. Requested {_requestedGrpcPort.Value}, but Docker mapped {mappedGrpcPort}"); } - throw new TimeoutException($"Timed out waiting for TCP port {host}:{port} to accept connections.", lastError); + HttpPort = mappedHttpPort; + GrpcPort = mappedGrpcPort; + + // The container log wait strategy can fire before the host port is actually accepting connections + // (especially on Windows). Ensure the ports are reachable from the test process. + await ContainerReadinessProbe.WaitForTcpPortAsync("127.0.0.1", HttpPort, TimeSpan.FromSeconds(30), cancellationToken); + await ContainerReadinessProbe.WaitForTcpPortAsync("127.0.0.1", GrpcPort, TimeSpan.FromSeconds(30), cancellationToken); + + // Even after the TCP ports start accepting connections the Dapr runtime may still be + // initializing (connecting to Placement/Scheduler, loading components, starting the + // workflow engine). Poll the HTTP port until the Dapr HTTP server starts processing + // requests. Any HTTP response (including 5xx) confirms that the HTTP server — and by + // extension the gRPC server — is actively routing requests, eliminating the brief window + // in which the gRPC port accepts TCP connections but the gRPC handlers are not yet + // installed. This prevents the transient "Error connecting to subchannel / Connection + // refused" errors that occur when the gRPC client first connects while the runtime is + // still completing its startup sequence. + await ContainerReadinessProbe.WaitForHttpReachableAsync( + $"http://127.0.0.1:{HttpPort}/v1.0/healthz", + TimeSpan.FromSeconds(30), + cancellationToken); } /// diff --git a/test/Dapr.Testcontainers.Test/Common/ContainerReadinessProbeTests.cs b/test/Dapr.Testcontainers.Test/Common/ContainerReadinessProbeTests.cs new file mode 100644 index 000000000..107af987b --- /dev/null +++ b/test/Dapr.Testcontainers.Test/Common/ContainerReadinessProbeTests.cs @@ -0,0 +1,310 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Net; +using System.Net.Http; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Testcontainers.Common; + +namespace Dapr.Testcontainers.Test.Common; + +public sealed class ContainerReadinessProbeTests +{ + // --------------------------------------------------------------------------- + // WaitForTcpPortAsync + // --------------------------------------------------------------------------- + + [Fact] + public async Task WaitForTcpPortAsync_Returns_WhenPortIsListening() + { + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + + try + { + // Should complete without throwing + await ContainerReadinessProbe.WaitForTcpPortAsync( + "127.0.0.1", port, TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + } + finally + { + listener.Stop(); + } + } + + [Fact] + public async Task WaitForTcpPortAsync_Retries_UntilPortIsListening() + { + // Start listener slightly after we begin probing to verify that retries happen + var listener = new TcpListener(IPAddress.Loopback, 0); + listener.Start(); + var port = ((IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); // stop immediately; we'll restart it after a delay + + var probeTask = ContainerReadinessProbe.WaitForTcpPortAsync( + "127.0.0.1", port, TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + + await Task.Delay(TimeSpan.FromMilliseconds(400), TestContext.Current.CancellationToken); // let a couple of probe attempts fail + var listener2 = new TcpListener(IPAddress.Loopback, port); + listener2.Start(); + + try + { + await probeTask; // should succeed now that the port is open + } + finally + { + listener2.Stop(); + } + } + + [Fact] + public async Task WaitForTcpPortAsync_ThrowsTimeoutException_WhenPortNeverListens() + { + // GetAvailablePort returns a port that is currently free (not listening) + var port = PortUtilities.GetAvailablePort(); + + await Assert.ThrowsAsync(() => + ContainerReadinessProbe.WaitForTcpPortAsync( + "127.0.0.1", port, TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task WaitForTcpPortAsync_ThrowsOperationCanceledException_WhenTokenCancelled() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var port = PortUtilities.GetAvailablePort(); + + await Assert.ThrowsAnyAsync(() => + ContainerReadinessProbe.WaitForTcpPortAsync( + "127.0.0.1", port, TimeSpan.FromSeconds(5), cts.Token)); + } + + // --------------------------------------------------------------------------- + // WaitForHttpReachableAsync — used by DaprdContainer to eliminate the brief + // "Connection refused" window after the TCP port first opens. + // --------------------------------------------------------------------------- + + [Fact] + public async Task WaitForHttpReachableAsync_Returns_When2xxIsReceived() + { + using var httpClient = CreateMockClient(HttpStatusCode.NoContent); // 204 + + await ContainerReadinessProbe.WaitForHttpReachableAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(5), + TestContext.Current.CancellationToken, + httpClient); + } + + [Fact] + public async Task WaitForHttpReachableAsync_Returns_When5xxIsReceived() + { + // 500 / 503 mean "server is running but not yet healthy" — the reachability + // check should return immediately rather than retrying. + using var httpClient = CreateMockClient(HttpStatusCode.InternalServerError); // 500 + + await ContainerReadinessProbe.WaitForHttpReachableAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(5), + TestContext.Current.CancellationToken, + httpClient); + } + + [Fact] + public async Task WaitForHttpReachableAsync_Returns_WhenServerFirstRefusesThenResponds() + { + // First call throws (connection refused); second returns 500 (server now running). + int callCount = 0; + var handler = new DelegateHandler(async (_, ct) => + { + callCount++; + if (callCount == 1) + throw new HttpRequestException("Simulated connection refused"); + // 500 is fine — server is up + return new HttpResponseMessage(HttpStatusCode.InternalServerError); + }); + using var httpClient = new HttpClient(handler); + + await ContainerReadinessProbe.WaitForHttpReachableAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(10), + TestContext.Current.CancellationToken, + httpClient); + + Assert.Equal(2, callCount); + } + + [Fact] + public async Task WaitForHttpReachableAsync_ThrowsTimeoutException_WhenConnectionAlwaysRefused() + { + var handler = new DelegateHandler((_, _) => + throw new HttpRequestException("Simulated connection refused")); + using var httpClient = new HttpClient(handler); + + await Assert.ThrowsAsync(() => + ContainerReadinessProbe.WaitForHttpReachableAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromMilliseconds(300), + TestContext.Current.CancellationToken, + httpClient)); + } + + [Fact] + public async Task WaitForHttpReachableAsync_ThrowsOperationCanceledException_WhenTokenCancelled() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var handler = new DelegateHandler((_, _) => + throw new HttpRequestException("Simulated connection refused")); + using var httpClient = new HttpClient(handler); + + await Assert.ThrowsAnyAsync(() => + ContainerReadinessProbe.WaitForHttpReachableAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(5), + cts.Token, + httpClient)); + } + + // --------------------------------------------------------------------------- + // WaitForHttpHealthAsync — stricter check that requires a 2xx response. + // --------------------------------------------------------------------------- + + [Fact] + public async Task WaitForHttpHealthAsync_Returns_When2xxIsReceived() + { + using var httpClient = CreateMockClient(HttpStatusCode.NoContent); // 204 + + await ContainerReadinessProbe.WaitForHttpHealthAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(5), + TestContext.Current.CancellationToken, + httpClient); + } + + [Fact] + public async Task WaitForHttpHealthAsync_Returns_When200IsReceived() + { + using var httpClient = CreateMockClient(HttpStatusCode.OK); // 200 + + await ContainerReadinessProbe.WaitForHttpHealthAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(5), + TestContext.Current.CancellationToken, + httpClient); + } + + [Fact] + public async Task WaitForHttpHealthAsync_Retries_UntilSuccessful() + { + // First two calls return 503, third call returns 204 + using var httpClient = CreateMockClientWithFailures(HttpStatusCode.NoContent, failCount: 2); + + await ContainerReadinessProbe.WaitForHttpHealthAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(10), + TestContext.Current.CancellationToken, + httpClient); + } + + [Fact] + public async Task WaitForHttpHealthAsync_ThrowsTimeoutException_WhenEndpointNeverSucceeds() + { + using var httpClient = CreateMockClient(HttpStatusCode.ServiceUnavailable); // 503 forever + + await Assert.ThrowsAsync(() => + ContainerReadinessProbe.WaitForHttpHealthAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromMilliseconds(300), + TestContext.Current.CancellationToken, + httpClient)); + } + + [Fact] + public async Task WaitForHttpHealthAsync_ThrowsOperationCanceledException_WhenTokenCancelled() + { + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + using var httpClient = CreateMockClient(HttpStatusCode.ServiceUnavailable); + + await Assert.ThrowsAnyAsync(() => + ContainerReadinessProbe.WaitForHttpHealthAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(5), + cts.Token, + httpClient)); + } + + [Fact] + public async Task WaitForHttpHealthAsync_Retries_WhenHttpRequestExceptionIsThrown() + { + // First call throws HttpRequestException, second call returns 204 + int callCount = 0; + var handler = new DelegateHandler(async (_, ct) => + { + callCount++; + if (callCount == 1) + throw new HttpRequestException("Simulated connection refused"); + return new HttpResponseMessage(HttpStatusCode.NoContent); + }); + using var httpClient = new HttpClient(handler); + + await ContainerReadinessProbe.WaitForHttpHealthAsync( + "http://127.0.0.1:9999/v1.0/healthz", + TimeSpan.FromSeconds(10), + TestContext.Current.CancellationToken, + httpClient); + + Assert.Equal(2, callCount); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + private static HttpClient CreateMockClient(HttpStatusCode statusCode) + { + var handler = new DelegateHandler((_, _) => + Task.FromResult(new HttpResponseMessage(statusCode))); + return new HttpClient(handler); + } + + private static HttpClient CreateMockClientWithFailures(HttpStatusCode successCode, int failCount) + { + var callCount = 0; + var handler = new DelegateHandler((_, _) => + { + callCount++; + var code = callCount <= failCount ? HttpStatusCode.ServiceUnavailable : successCode; + return Task.FromResult(new HttpResponseMessage(code)); + }); + return new HttpClient(handler); + } + + private sealed class DelegateHandler( + Func> sendAsync) : HttpMessageHandler + { + protected override Task SendAsync( + HttpRequestMessage request, CancellationToken cancellationToken) => + sendAsync(request, cancellationToken); + } +}