diff --git a/src/Dapr.Testcontainers/Common/Testing/DaprTestApplicationBuilder.cs b/src/Dapr.Testcontainers/Common/Testing/DaprTestApplicationBuilder.cs index 13f8cc4a1..95e64a050 100644 --- a/src/Dapr.Testcontainers/Common/Testing/DaprTestApplicationBuilder.cs +++ b/src/Dapr.Testcontainers/Common/Testing/DaprTestApplicationBuilder.cs @@ -29,6 +29,7 @@ public sealed class DaprTestApplicationBuilder(BaseHarness harness) { private Action? _configureServices; private Action? _configureApp; + private bool _shouldLoadResourcesFirst = true; /// /// Configures services for the test application. @@ -48,39 +49,121 @@ public DaprTestApplicationBuilder ConfigureApp(Action configure) return this; } + /// + /// Configures the startup order of Dapr resources and the application. + /// + /// + /// If true (default), Dapr container starts before the app. If false, the + /// app starts before the Dapr container. + /// + public DaprTestApplicationBuilder WithDaprStartupOrder(bool shouldLoadResourcesFirst) + { + _shouldLoadResourcesFirst = shouldLoadResourcesFirst; + return this; + } + /// /// Builds and starts the test application and harness. /// /// public async Task BuildAndStartAsync() { - await harness.InitializeAsync(); - WebApplication? app = null; - if (_configureServices is not null || _configureApp is not null) + + if (_shouldLoadResourcesFirst) { - var builder = WebApplication.CreateBuilder(); - - // Configure Dapr endpoints via in-memory configuration instead of environment variables - builder.Configuration.AddInMemoryCollection(new Dictionary + // Load the harness and resources, then the app + await harness.InitializeAsync(); + + if (_configureServices is not null || _configureApp is not null) { - { "DAPR_HTTP_ENDPOINT", $"http://127.0.0.1:{harness.DaprHttpPort}" }, - { "DAPR_GRPC_ENDPOINT", $"http://127.0.0.1:{harness.DaprGrpcPort}" } - }); - - builder.Logging.ClearProviders(); - builder.Logging.AddSimpleConsole(); - builder.WebHost.UseUrls($"http://0.0.0.0:{harness.AppPort}"); - - _configureServices?.Invoke(builder); + app = CreateApp(); + await app.StartAsync(); + } - app = builder.Build(); - - _configureApp?.Invoke(app); + return new DaprTestApplication(harness, app); + } + + // App-first: start app, then start resources + // If daprd cannot bind the chosen ports, restart the app with new ports + const int maxAttempts = 5; + Exception? lastError = null; + + for (var attempt = 1; attempt <= maxAttempts; attempt++) + { + WebApplication? attemptApp = null; + + try + { + // Pre-assign prots for the app knows where Dapr will be (avoid collisions) + var httpPort = PortUtilities.GetAvailablePort(); + var grpcPort = PortUtilities.GetAvailablePort(); + while (grpcPort == httpPort) + grpcPort = PortUtilities.GetAvailablePort(); + + var appPort = PortUtilities.GetAvailablePort(); + while (appPort == httpPort || appPort == grpcPort) + appPort = PortUtilities.GetAvailablePort(); + + harness.SetPorts(httpPort, grpcPort); + harness.SetAppPort(appPort); + + // Load the app (configuration/services/pipeline), but delay StartAsync until daprd is up + if (_configureServices is not null || _configureApp is not null) + { + attemptApp = CreateApp(); + await attemptApp.StartAsync(); + } + + await harness.InitializeAsync(); + + return new DaprTestApplication(harness, attemptApp); + } + catch (Exception ex) + { + lastError = ex; - await app.StartAsync(); + if (attemptApp is not null) + { + try + { + await attemptApp.StopAsync(); + } + finally + { + await attemptApp.DisposeAsync(); + } + } + + // Try again with a frest set of ports + } } - return new DaprTestApplication(harness, app); + throw new InvalidOperationException( + $"Failed to start app-first Dapr test application after {maxAttempts} attempts.", lastError); + } + + private WebApplication CreateApp() + { + var builder = WebApplication.CreateBuilder(); + + // Configure Dapr endpoints via in-memory configuration instead of environment variables + builder.Configuration.AddInMemoryCollection(new Dictionary + { + { "DAPR_HTTP_ENDPOINT", $"http://127.0.0.1:{harness.DaprHttpPort}" }, + { "DAPR_GRPC_ENDPOINT", $"http://127.0.0.1:{harness.DaprGrpcPort}" } + }); + + builder.Logging.ClearProviders(); + builder.Logging.AddSimpleConsole(); + builder.WebHost.UseUrls($"http://0.0.0.0:{harness.AppPort}"); + + _configureServices?.Invoke(builder); + + var app = builder.Build(); + + _configureApp?.Invoke(app); + + return app; } } diff --git a/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs b/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs index 5a9a5cda0..1bf1dfe4d 100644 --- a/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs +++ b/src/Dapr.Testcontainers/Containers/Dapr/DaprdContainer.cs @@ -13,6 +13,7 @@ using System; using System.Collections.Generic; +using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Dapr.Testcontainers.Common; @@ -48,6 +49,9 @@ public sealed class DaprdContainer : IAsyncStartable /// public int GrpcPort { get; private set; } + private readonly int? _requestedHttpPort; + private readonly int? _requestedGrpcPort; + /// /// The hostname to locate the Dapr runtime on in the shared Docker network. /// @@ -62,8 +66,22 @@ public sealed class DaprdContainer : IAsyncStartable /// The shared Docker network to connect to. /// The hostname and port of the Placement service. /// The hostname and port of the Scheduler service. - public DaprdContainer(string appId, string componentsHostFolder, DaprRuntimeOptions options, INetwork network, HostPortPair? placementHostAndPort = null, HostPortPair? schedulerHostAndPort = null) + /// The host HTTP port to bind to. + /// The host gRPC port to bind to. + public DaprdContainer( + string appId, + string componentsHostFolder, + DaprRuntimeOptions options, + INetwork network, + HostPortPair? placementHostAndPort = null, + HostPortPair? schedulerHostAndPort = null, + int? daprHttpPort = null, + int? daprGrpcPort = null + ) { + _requestedHttpPort = daprHttpPort; + _requestedGrpcPort = daprGrpcPort; + const string componentsPath = "/components"; var cmd = new List @@ -102,28 +120,89 @@ public DaprdContainer(string appId, string componentsHostFolder, DaprRuntimeOpti cmd.Add(""); } - _container = new ContainerBuilder() + var containerBuilder = new ContainerBuilder() .WithImage(options.RuntimeImageTag) .WithName(_containerName) .WithLogger(ConsoleLogger.Instance) .WithCommand(cmd.ToArray()) .WithNetwork(network) .WithExtraHost(ContainerHostAlias, "host-gateway") - .WithPortBinding(InternalHttpPort, assignRandomHostPort: true) - .WithPortBinding(InternalGrpcPort, assignRandomHostPort: true) .WithBindMount(componentsHostFolder, componentsPath, AccessMode.ReadOnly) .WithWaitStrategy(Wait.ForUnixContainer() - .UntilMessageIsLogged("Internal gRPC server is running")) + .UntilMessageIsLogged("Internal gRPC server is running")); //.UntilMessageIsLogged(@"^dapr initialized. Status: Running. Init Elapsed ")) - .Build(); + + containerBuilder = daprHttpPort is not null ? containerBuilder.WithPortBinding(containerPort: InternalHttpPort, hostPort: daprHttpPort.Value) : containerBuilder.WithPortBinding(port: InternalHttpPort, assignRandomHostPort: true); + containerBuilder = daprGrpcPort is not null ? containerBuilder.WithPortBinding(containerPort: InternalGrpcPort, hostPort: daprGrpcPort.Value) : containerBuilder.WithPortBinding(port: InternalGrpcPort, assignRandomHostPort: true); + + _container = containerBuilder.Build(); } /// public async Task StartAsync(CancellationToken cancellationToken = default) { await _container.StartAsync(cancellationToken); - HttpPort = _container.GetMappedPublicPort(InternalHttpPort); - GrpcPort = _container.GetMappedPublicPort(InternalGrpcPort); + + var mappedHttpPort = _container.GetMappedPublicPort(InternalHttpPort); + var mappedGrpcPort = _container.GetMappedPublicPort(InternalGrpcPort); + + if (_requestedHttpPort is not null && mappedHttpPort != _requestedHttpPort.Value) + { + throw new InvalidOperationException( + $"Dapr HTTP port mapping mismatch. Requested {_requestedHttpPort.Value}, but Docker mapped {mappedHttpPort}"); + } + + if (_requestedGrpcPort is not null && mappedGrpcPort != _requestedGrpcPort.Value) + { + throw new InvalidOperationException( + $"Dapr gRPC port mapping mismatch. Requested {_requestedGrpcPort.Value}, but Docker mapped {mappedGrpcPort}"); + } + + HttpPort = mappedHttpPort; + GrpcPort = mappedGrpcPort; + + // The container log wait strategy can fire before the host port is actually accepting connections + // (especially on Windows). Ensure the ports are reachable from the test process. + await WaitForTcpPortAsync("127.0.0.1", HttpPort, TimeSpan.FromSeconds(30), cancellationToken); + await WaitForTcpPortAsync("127.0.0.1", GrpcPort, TimeSpan.FromSeconds(30), cancellationToken); + } + + private static async Task WaitForTcpPortAsync( + string host, + int port, + TimeSpan timeout, + CancellationToken cancellationToken) + { + var start = DateTimeOffset.UtcNow; + Exception? lastError = null; + + while (DateTimeOffset.UtcNow - start < timeout) + { + cancellationToken.ThrowIfCancellationRequested(); + + try + { + using var client = new TcpClient(); + var connectTask = client.ConnectAsync(host, port); + + var completed = await Task.WhenAny(connectTask, + Task.Delay(TimeSpan.FromMilliseconds(250), cancellationToken)); + if (completed == connectTask) + { + // Will throw if connect failed + await connectTask; + return; + } + } + catch (Exception ex) when (ex is SocketException or InvalidOperationException) + { + lastError = ex; + } + + await Task.Delay(TimeSpan.FromMilliseconds(200), cancellationToken); + } + + throw new TimeoutException($"Timed out waiting for TCP port {host}:{port} to accept connections.", lastError); } /// diff --git a/src/Dapr.Testcontainers/Harnesses/BaseHarness.cs b/src/Dapr.Testcontainers/Harnesses/BaseHarness.cs index 45b0f509a..b8592cf85 100644 --- a/src/Dapr.Testcontainers/Harnesses/BaseHarness.cs +++ b/src/Dapr.Testcontainers/Harnesses/BaseHarness.cs @@ -30,10 +30,14 @@ public abstract class BaseHarness : IAsyncContainerFixture /// The Daprd container exposed by the harness. /// private protected DaprdContainer? _daprd; + + private int? _daprHttpPortOverride; + private int? _daprGrpcPortOverride; + /// /// Indicates whether the sidecar ports are ready for use. /// - private readonly TaskCompletionSource _sidecarPortsReady = new(TaskCreationOptions.RunContinuationsAsynchronously); + private TaskCompletionSource _sidecarPortsReady = new(TaskCreationOptions.RunContinuationsAsynchronously); /// /// The isolated network and Dapr environment this harness is using. /// @@ -42,6 +46,14 @@ public abstract class BaseHarness : IAsyncContainerFixture /// Tracks whether the environment was created implicitly (standalone mode) or is shared (controlling whether we dispose of it). /// private readonly bool _ownsEnvironment; + /// + /// The HTTP port used by the Daprd container. + /// + public int DaprHttpPort => _daprd?.HttpPort ?? _daprHttpPortOverride ?? 0; + /// + /// The gRPC port used by the Daprd container. + /// + public int DaprGrpcPort => _daprd?.GrpcPort ?? _daprGrpcPortOverride ?? 0; private readonly string componentsDirectory; private readonly Func? startApp; @@ -86,20 +98,22 @@ protected BaseHarness(string componentsDirectory, Func? startApp, Dap public int AppPort { get; private set; } /// - /// The HTTP port used by the Daprd container. + /// Pre-assigns teh application port to use. This is useful when the app is created before the Dapr container and + /// other resources. /// - public int DaprHttpPort => _daprd?.HttpPort ?? 0; + /// The app port. + public void SetAppPort(int appPort) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(appPort, 0, nameof(appPort)); + AppPort = appPort; + } + /// /// The HTTP endpoint used by the Daprd container. /// public string DaprHttpEndpoint => $"http://{DaprdContainer.ContainerHostAlias}:{DaprHttpPort}"; - /// - /// The gRPC port used by the Daprd container. - /// - public int DaprGrpcPort => _daprd?.GrpcPort ?? 0; - /// /// The gRPC endpoint used by the Daprd container. /// @@ -129,6 +143,17 @@ protected BaseHarness(string componentsDirectory, Func? startApp, Dap /// The network alias of the scheduler container, if started. /// protected string? DaprSchedulerAlias { get; set; } + + /// + /// Pre-assigns the Dapr ports to use. This is useful when the app starts before the Dapr container. + /// + /// The HTTP port. + /// The gRPC port. + public void SetPorts(int httpPort, int grpcPort) + { + _daprHttpPortOverride = httpPort; + _daprGrpcPortOverride = grpcPort; + } /// /// The specific container startup logic for the harness. @@ -143,6 +168,16 @@ protected BaseHarness(string componentsDirectory, Func? startApp, Dap /// public async Task InitializeAsync(CancellationToken cancellationToken = default) { + // Allow InitializeAsync to be called multiple times (used by app-first retries) + // Reset readiness gate and dispose previous daprd if any + _sidecarPortsReady = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + if (_daprd is not null) + { + await _daprd.DisposeAsync(); + _daprd = null; + } + // Ensure the environment infrastructure is running // If we own it, we start it. If it's shared, the caller usually starts it // but calling StartAsync is idempotent, so it's safe to ensure it here @@ -151,9 +186,13 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default) // Run the actual container orchestration defined in the subclass to set up any pre-requisite containers // before loading daprd and the start app, if specified await OnInitializeAsync(cancellationToken); - - // PIck a port only when we are ready to start, or use 0 to let the OS decide - this.AppPort = PortUtilities.GetAvailablePort(); + + // Pick a port only when we are ready to start, or use 0 to let the OS decide. + // If the app port was pre-assigned (app constructored/configured first), keep it. + if (this.AppPort <= 0) + { + this.AppPort = PortUtilities.GetAvailablePort(); + } // Configure and start daprd; point at placement & scheduler _daprd = new DaprdContainer( @@ -164,7 +203,9 @@ public async Task InitializeAsync(CancellationToken cancellationToken = default) DaprPlacementExternalPort is null || DaprPlacementAlias is null ? null : new HostPortPair(DaprPlacementAlias, DaprPlacementContainer.InternalPort), DaprSchedulerExternalPort is null || DaprSchedulerAlias is null - ? null : new HostPortPair(DaprSchedulerAlias, DaprSchedulerContainer.InternalPort)); + ? null : new HostPortPair(DaprSchedulerAlias, DaprSchedulerContainer.InternalPort), + _daprHttpPortOverride, + _daprGrpcPortOverride); var daprdTask = Task.Run(async () => { @@ -178,7 +219,6 @@ DaprSchedulerExternalPort is null || DaprSchedulerAlias is null appTask = Task.Run(async () => { await _sidecarPortsReady.Task.WaitAsync(cancellationToken); - await startApp(AppPort); }, cancellationToken); } diff --git a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs index f7af36fdb..e5f0410f0 100644 --- a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs +++ b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs @@ -26,6 +26,8 @@ namespace Dapr.Workflow.Worker.Grpc; /// internal sealed class GrpcProtocolHandler(TaskHubSidecarService.TaskHubSidecarServiceClient grpcClient, ILoggerFactory loggerFactory, int maxConcurrentWorkItems = 100, int maxConcurrentActivities = 100) : IAsyncDisposable { + private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5); + private readonly CancellationTokenSource _disposalCts = new(); private readonly ILogger _logger = loggerFactory?.CreateLogger() ?? throw new ArgumentNullException(nameof(loggerFactory)); private readonly TaskHubSidecarService.TaskHubSidecarServiceClient _grpcClient = @@ -49,32 +51,71 @@ public async Task StartAsync( { using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposalCts.Token); var token = linkedCts.Token; + + // Establish the bidirectional stream + var request = new GetWorkItemsRequest + { + MaxConcurrentOrchestrationWorkItems = _maxConcurrentWorkItems, + MaxConcurrentActivityWorkItems = _maxConcurrentActivities + }; - try + while (!token.IsCancellationRequested) { - _logger.LogGrpcProtocolHandlerStartStream(); + try + { + _logger.LogGrpcProtocolHandlerStartStream(); + + // Establish the server streaming call + _streamingCall = _grpcClient.GetWorkItems(request, cancellationToken: token); - // Establish the bidirectional stream - var request = new GetWorkItemsRequest + // Process work items from the stream + await ReceiveLoopAsync(_streamingCall.ResponseStream, workflowHandler, activityHandler, token); + + // Stream ended gracefully => tradfe as an interrupted and reconnect unless shutting down + if (!token.IsCancellationRequested) + { + await DelayOrStopAsync(ReconnectDelay, token); + } + } + catch (OperationCanceledException) when (token.IsCancellationRequested) { - MaxConcurrentOrchestrationWorkItems = _maxConcurrentWorkItems, - MaxConcurrentActivityWorkItems = _maxConcurrentActivities - }; - - // Establish the server streaming call - _streamingCall = _grpcClient.GetWorkItems(request, cancellationToken: token); - - // Process work items from the stream - await ReceiveLoopAsync(_streamingCall.ResponseStream, workflowHandler, activityHandler, token); - } - catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled) + _logger.LogGrpcProtocolHandlerStreamCanceled(); + break; + } + catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled && token.IsCancellationRequested) + { + _logger.LogGrpcProtocolHandlerStreamCanceled(); + break; + } + catch (RpcException ex) when (!token.IsCancellationRequested) + { + // Any RpcException while not shutting down -> retry indefinitely (transient or not) + _logger.LogGrpcProtocolHandlerGenericError(ex); + await DelayOrStopAsync(ReconnectDelay, token); + } + catch (Exception ex) when (!token.IsCancellationRequested) + { + // Any other interruption -> retry indefinitely + _logger.LogGrpcProtocolHandlerGenericError(ex); + await DelayOrStopAsync(ReconnectDelay, token); + } + finally + { + _streamingCall?.Dispose(); + _streamingCall = null; + } + } + } + + private static async Task DelayOrStopAsync(TimeSpan delay, CancellationToken token) + { + try { - _logger.LogGrpcProtocolHandlerStreamCanceled(); + await Task.Delay(delay, token); } - catch (Exception ex) + catch (OperationCanceledException) when (token.IsCancellationRequested) { - _logger.LogGrpcProtocolHandlerGenericError(ex); - throw; + // Swallow cancellation so StartAsync exits cleanly when the host/test cancels. } } diff --git a/test/Dapr.IntegrationTest.Workflow/SimpleWorkflowTests.cs b/test/Dapr.IntegrationTest.Workflow/SimpleWorkflowTests.cs index c4ef7f637..df1eb53a7 100644 --- a/test/Dapr.IntegrationTest.Workflow/SimpleWorkflowTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/SimpleWorkflowTests.cs @@ -22,8 +22,10 @@ namespace Dapr.IntegrationTest.Workflow; public sealed class SimpleWorkflowTests { - [Fact] - public async Task ShouldHandleSimpleWorkflow() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task ShouldHandleSimpleWorkflow(bool loadResourcesFirst) { var options = new DaprRuntimeOptions(); var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); @@ -50,6 +52,7 @@ public async Task ShouldHandleSimpleWorkflow() clientBuilder.UseGrpcEndpoint(grpcEndpoint); }); }) + .WithDaprStartupOrder(loadResourcesFirst) // Used to change the startup order of Dapr and the app .BuildAndStartAsync(); // Clean test logic diff --git a/test/Dapr.Testcontainers.Test/Harnesses/StartupOrderTests.cs b/test/Dapr.Testcontainers.Test/Harnesses/StartupOrderTests.cs new file mode 100644 index 000000000..bc023ce2d --- /dev/null +++ b/test/Dapr.Testcontainers.Test/Harnesses/StartupOrderTests.cs @@ -0,0 +1,82 @@ +using Dapr.Testcontainers.Common; +using Dapr.Testcontainers.Common.Options; +using Dapr.Testcontainers.Harnesses; + +namespace Dapr.Testcontainers.Test.Harnesses; + +public sealed class StartupOrderTests +{ + [Fact] + public async Task AppStartsFirst_ShouldPreconfigurePorts() + { + var options = new DaprRuntimeOptions(); + var environment = new DaprTestEnvironment(options); + + // We need a concrete harness to test BaseHarness logic + var harness = new TestHarness(options, environment); + var builder = DaprHarnessBuilder.ForHarness(harness); + + // Configure it to load resources *after* the app (meaning App loads first) + builder.WithDaprStartupOrder(shouldLoadResourcesFirst: false); + + // We mock the app startup to verify ports are set + builder.ConfigureApp(app => + { + // Verify ports are assigned before the app fully starts/harness initializes + Assert.True(harness.DaprHttpPort > 0, "HTTP port should be pre-assigned"); + Assert.True(harness.DaprGrpcPort > 0, "gRPC port should be pre-assigned"); + }); + + // Use a dummy app start just to trigger the build flow, but we won't actually run a real web host for long + // This is a bit of a partial integration test because BuildAndStartAsync does real work. + // For pure unit testing, we'd need to mock more, but this validates the flow. + + await using var app = await builder.BuildAndStartAsync(); + + Assert.True(harness.DaprHttpPort > 0); + Assert.True(harness.DaprGrpcPort > 0); + } + + [Fact] + public void ResourcesStartFirst_ShouldNotPreconfigurePorts() + { + var options = new DaprRuntimeOptions(); + var environment = new DaprTestEnvironment(options); + var harness = new TestHarness(options, environment); + + var builder = DaprHarnessBuilder.ForHarness(harness); + + // Default behavior (Resources first) + builder.WithDaprStartupOrder(shouldLoadResourcesFirst: true); + + // Before we build, ports should be 0 + Assert.Equal(0, harness.DaprHttpPort); + Assert.Equal(0, harness.DaprGrpcPort); + + // We can't easily assert the "during startup" state without hooks, + // but we can verify the end result is valid. + + // NOTE: Running this fully might try to spin up containers. + // If we want to avoid that, we can rely on the fact that `SetPorts` wasn't called. + // But let's at least verify the builder state didn't mutate the harness prematurely. + + Assert.Equal(0, harness.DaprHttpPort); + } + + // Concrete implementation for testing BaseHarness + private class TestHarness : BaseHarness + { + + + public TestHarness(DaprRuntimeOptions options, DaprTestEnvironment environment) + : base(TestDirectoryManager.CreateTestDirectory("test-components"), null, options, environment) + { + ; + } + + protected override Task OnInitializeAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs b/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs index 229baa310..b40eb3eb9 100644 --- a/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs @@ -11,6 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ +using System.Diagnostics; using Dapr.DurableTask.Protobuf; using Dapr.Workflow.Worker.Grpc; using Grpc.Core; @@ -19,8 +20,72 @@ namespace Dapr.Workflow.Test.Worker.Grpc; -public class GrpcProtocolHandlerTests +public sealed class GrpcProtocolHandlerTests { + private static TaskCompletionSource CreateTcs() => + new(TaskCreationOptions.RunContinuationsAsynchronously); + + private static TaskCompletionSource CreateTcs() => new(TaskCreationOptions.RunContinuationsAsynchronously); + + private static async Task RunHandlerUntilAsync( + GrpcProtocolHandler handler, + Func> workflowHandler, + Func> activityHandler, + Task until, + TimeSpan timeout) + { + using var cts = new CancellationTokenSource(timeout); + var runTask = handler.StartAsync(workflowHandler, activityHandler, cts.Token); + try + { + await until.WaitAsync(cts.Token); + } + finally + { + // Always stop the infinite loop so the test can end + cts.Cancel(); + await runTask; + } + } + + private static async Task RunHandlerUntilAsync( + GrpcProtocolHandler handler, + Func> workflowHandler, + Func> activityHandler, + Func untilCondition, + TimeSpan timeout, + TimeSpan? pollInterval = null) + { + pollInterval ??= TimeSpan.FromMilliseconds(10); + + using var cts = new CancellationTokenSource(timeout); + + var runTask = handler.StartAsync(workflowHandler, activityHandler, cts.Token); + + try + { + var sw = Stopwatch.StartNew(); + + while (!untilCondition()) + { + cts.Token.ThrowIfCancellationRequested(); + + // Cheap polling; avoids needing TCS in every test. + await Task.Delay(pollInterval.Value, cts.Token); + + if (sw.Elapsed >= timeout) + { + throw new TimeoutException($"Condition was not met within {timeout}."); + } + } + } + finally + { + cts.Cancel(); + await runTask; + } + } + [Fact] public void Constructor_ShouldThrowArgumentNullException_WhenLoggerFactoryIsNull() { @@ -66,20 +131,23 @@ public async Task StartAsync_ShouldCompleteOrchestratorTask_ForOrchestratorWorkI .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCall(workItems)); - OrchestratorResponse? completed = null; + var completedTcs = CreateTcs(); + grpcClientMock .Setup(x => x.CompleteOrchestratorTaskAsync(It.IsAny(), null, null, It.IsAny())) - .Callback((r, _, _, _) => completed = r) + .Callback((r, _, _, _) => completedTcs.TrySetResult(r)) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: req => Task.FromResult(new OrchestratorResponse { InstanceId = req.InstanceId }), activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None); + until: completedTcs.Task, + timeout: TimeSpan.FromSeconds(2)); - Assert.NotNull(completed); + var completed = await completedTcs.Task; Assert.Equal("i-1", completed!.InstanceId); } @@ -105,21 +173,29 @@ public async Task StartAsync_ShouldCompleteActivityTask_ForActivityWorkItem() .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCall(workItems)); - ActivityResponse? completed = null; + var completedTcs = CreateTcs(); + grpcClientMock .Setup(x => x.CompleteActivityTaskAsync(It.IsAny(), null, null, It.IsAny())) - .Callback((r, _, _, _) => completed = r) + .Callback((r, _, _, _) => completedTcs.TrySetResult(r)) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: req => Task.FromResult(new ActivityResponse { InstanceId = req.OrchestrationInstance.InstanceId, TaskId = req.TaskId, Result = "ok" }), - cancellationToken: CancellationToken.None); - - Assert.NotNull(completed); - Assert.Equal("i-2", completed!.InstanceId); + activityHandler: req => Task.FromResult(new ActivityResponse + { + InstanceId = req.OrchestrationInstance.InstanceId, + TaskId = req.TaskId, + Result = "ok" + }), + until: completedTcs.Task, + timeout: TimeSpan.FromSeconds(2)); + + var completed = await completedTcs.Task; + Assert.Equal("i-2", completed.InstanceId); Assert.Equal(42, completed.TaskId); Assert.Equal("ok", completed.Result); } @@ -141,21 +217,25 @@ public async Task StartAsync_ShouldSendFailureResult_WhenOrchestratorHandlerThro .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCall(workItems)); - OrchestratorResponse? completed = null; + var completedTcs = CreateTcs(); + grpcClientMock .Setup(x => x.CompleteOrchestratorTaskAsync(It.IsAny(), null, null, It.IsAny())) - .Callback((r, _, _, _) => completed = r) + .Callback((r, _, _, _) => completedTcs.TrySetResult(r)) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => throw new InvalidOperationException("boom"), activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None); + until: completedTcs.Task, + timeout: TimeSpan.FromSeconds(2)); - Assert.NotNull(completed); - Assert.Equal("i-err", completed!.InstanceId); + var completed = await completedTcs.Task; + + Assert.Equal("i-err", completed.InstanceId); Assert.Single(completed.Actions); Assert.NotNull(completed.Actions[0].CompleteOrchestration); Assert.Equal(OrchestrationStatus.Failed, completed.Actions[0].CompleteOrchestration.OrchestrationStatus); @@ -188,12 +268,18 @@ public async Task StartAsync_ShouldSendGetWorkItemsRequest_WithConfiguredConcurr return CreateServerStreamingCall(Array.Empty()); }); - var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance, maxConcurrentWorkItems: 7, maxConcurrentActivities: 9); + var handler = new GrpcProtocolHandler( + grpcClientMock.Object, + NullLoggerFactory.Instance, + maxConcurrentWorkItems: 7, + maxConcurrentActivities: 9); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None); + untilCondition: () => captured is not null, + timeout: TimeSpan.FromSeconds(2)); Assert.NotNull(captured); Assert.Equal(7, captured!.MaxConcurrentOrchestrationWorkItems); @@ -211,10 +297,20 @@ public async Task StartAsync_ShouldReturnWithoutThrowing_WhenGrpcStreamIsCancell var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); + // With "retry forever unless shutting down" semantics, the graceful shutdown signal + // is cancellation. If the token is already canceled, StartAsync should exit immediately. + using var cts = new CancellationTokenSource(); + cts.Cancel(); + await handler.StartAsync( workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None); + cancellationToken: cts.Token); + + // Since we were already canceled, we shouldn't even attempt to connect. + grpcClientMock.Verify( + x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny()), + Times.Never()); } [Fact] @@ -239,21 +335,25 @@ public async Task StartAsync_ShouldSendActivityFailureResult_WhenActivityHandler .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCall(workItems)); - ActivityResponse? sent = null; + var sentTcs = CreateTcs(); + grpcClientMock .Setup(x => x.CompleteActivityTaskAsync(It.IsAny(), null, null, It.IsAny())) - .Callback((r, _, _, _) => sent = r) + .Callback((r, _, _, _) => sentTcs.TrySetResult(r)) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), activityHandler: _ => throw new InvalidOperationException("boom"), - cancellationToken: CancellationToken.None); + until: sentTcs.Task, + timeout: TimeSpan.FromSeconds(2)); + + var sent = await sentTcs.Task; - Assert.NotNull(sent); - Assert.Equal("i-1", sent!.InstanceId); + Assert.Equal("i-1", sent.InstanceId); Assert.Equal(0, sent.TaskId); Assert.NotNull(sent.FailureDetails); Assert.Contains(nameof(InvalidOperationException), sent.FailureDetails.ErrorType); @@ -282,16 +382,23 @@ public async Task StartAsync_ShouldNotThrow_WhenSendingActivityFailureResultAlso .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCall(workItems)); + var completeAttempted = false; + grpcClientMock .Setup(x => x.CompleteActivityTaskAsync(It.IsAny(), null, null, It.IsAny())) + .Callback(() => completeAttempted = true) .Throws(new RpcException(new Status(StatusCode.Unavailable, "nope"))); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), activityHandler: _ => throw new Exception("boom"), - cancellationToken: CancellationToken.None); + untilCondition: () => completeAttempted, + timeout: TimeSpan.FromSeconds(2)); + + Assert.True(completeAttempted); } [Fact] @@ -316,21 +423,25 @@ public async Task StartAsync_ShouldUseCreateActivityFailureResult_WithNullStackT .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCall(workItems)); - ActivityResponse? sent = null; + var sentTcs = CreateTcs(); + grpcClientMock .Setup(x => x.CompleteActivityTaskAsync(It.IsAny(), null, null, It.IsAny())) - .Callback((r, _, _, _) => sent = r) + .Callback((r, _, _, _) => sentTcs.TrySetResult(r)) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), activityHandler: _ => throw new NullStackTraceException("boom"), - cancellationToken: CancellationToken.None); + until: sentTcs.Task, + timeout: TimeSpan.FromSeconds(2)); + + var sent = await sentTcs.Task; - Assert.NotNull(sent); - Assert.NotNull(sent!.FailureDetails); + Assert.NotNull(sent.FailureDetails); Assert.Null(sent.FailureDetails.StackTrace); Assert.Contains("boom", sent.FailureDetails.ErrorMessage); } @@ -357,26 +468,28 @@ public async Task StartAsync_ShouldNotCallCompleteActivityTask_WhenActivityHandl .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCallIgnoringCancellation(workItems)); - var completeCalled = false; grpcClientMock .Setup(x => x.CompleteActivityTaskAsync(It.IsAny(), null, null, It.IsAny())) - .Callback(() => completeCalled = true) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - using var cts = new CancellationTokenSource(); + // Add a safety timeout so this test can never hang, even if behavior regresses. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); await handler.StartAsync( workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), activityHandler: _ => { - cts.Cancel(); // make StartAsync's linked token "IsCancellationRequested == true" + // Make StartAsync's linked token "IsCancellationRequested == true" + cts.Cancel(); throw new OperationCanceledException(cts.Token); }, cancellationToken: cts.Token); - Assert.False(completeCalled); + grpcClientMock.Verify( + x => x.CompleteActivityTaskAsync(It.IsAny(), null, null, It.IsAny()), + Times.Never()); } [Fact] @@ -396,15 +509,14 @@ public async Task StartAsync_ShouldNotCallCompleteOrchestratorTask_WhenWorkflowH .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCallIgnoringCancellation(workItems)); - var completeCalled = false; grpcClientMock .Setup(x => x.CompleteOrchestratorTaskAsync(It.IsAny(), null, null, It.IsAny())) - .Callback(() => completeCalled = true) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - using var cts = new CancellationTokenSource(); + // Add a safety timeout so this test can never hang, even if behavior regresses. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); await handler.StartAsync( workflowHandler: _ => @@ -415,7 +527,9 @@ await handler.StartAsync( activityHandler: _ => Task.FromResult(new ActivityResponse()), cancellationToken: cts.Token); - Assert.False(completeCalled); + grpcClientMock.Verify( + x => x.CompleteOrchestratorTaskAsync(It.IsAny(), null, null, It.IsAny()), + Times.Never()); } [Fact] @@ -442,14 +556,25 @@ public async Task StartAsync_ShouldCleanupCompletedActiveWorkItems_WhenActiveWor .Callback(() => Interlocked.Increment(ref completedCount)) .Returns(CreateAsyncUnaryCall(new CompleteTaskResponse())); - var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance, maxConcurrentWorkItems: 1, maxConcurrentActivities: 1); + var handler = new GrpcProtocolHandler( + grpcClientMock.Object, + NullLoggerFactory.Instance, + maxConcurrentWorkItems: 1, + maxConcurrentActivities: 1); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: req => Task.FromResult(new ActivityResponse { InstanceId = req.OrchestrationInstance.InstanceId, TaskId = req.TaskId, Result = "ok" }), - cancellationToken: CancellationToken.None); - - Assert.Equal(3, completedCount); + activityHandler: req => Task.FromResult(new ActivityResponse + { + InstanceId = req.OrchestrationInstance.InstanceId, + TaskId = req.TaskId, + Result = "ok" + }), + untilCondition: () => Volatile.Read(ref completedCount) >= 3, + timeout: TimeSpan.FromSeconds(2)); + + Assert.Equal(3, Volatile.Read(ref completedCount)); } [Fact] @@ -469,16 +594,23 @@ public async Task StartAsync_ShouldNotThrow_WhenWorkflowHandlerThrows_AndSending .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) .Returns(CreateServerStreamingCall(workItems)); + var completeAttempted = false; + grpcClientMock .Setup(x => x.CompleteOrchestratorTaskAsync(It.IsAny(), null, null, It.IsAny())) + .Callback(() => completeAttempted = true) .Throws(new RpcException(new Status(StatusCode.Unavailable, "nope"))); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => throw new InvalidOperationException("boom"), activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None); + untilCondition: () => completeAttempted, + timeout: TimeSpan.FromSeconds(2)); + + Assert.True(completeAttempted); } [Fact] @@ -491,16 +623,23 @@ public async Task StartAsync_ShouldHandleUnknownWorkItemType_AndWaitForActiveTas new WorkItem() // RequestCase = None }; + var getWorkItemsCalled = false; + grpcClientMock .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) + .Callback(() => getWorkItemsCalled = true) .Returns(CreateServerStreamingCall(workItems)); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - await handler.StartAsync( + await RunHandlerUntilAsync( + handler, workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None); + untilCondition: () => getWorkItemsCalled, + timeout: TimeSpan.FromSeconds(2)); + + Assert.True(getWorkItemsCalled); } [Fact] @@ -508,19 +647,24 @@ public async Task StartAsync_ShouldRethrow_WhenReceiveLoopThrowsBeforeAnyItemsAr { var grpcClientMock = CreateGrpcClientMock(); + var getWorkItemsCalls = 0; + grpcClientMock .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) - .Returns(CreateServerStreamingCallFromReader(new ThrowingAsyncStreamReader(new InvalidOperationException("boom")))); + .Callback(() => Interlocked.Increment(ref getWorkItemsCalls)) + .Returns(CreateServerStreamingCallFromReader( + new ThrowingAsyncStreamReader(new InvalidOperationException("boom")))); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - var ex = await Assert.ThrowsAsync(() => - handler.StartAsync( - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None)); + await RunHandlerUntilAsync( + handler, + workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), + activityHandler: _ => Task.FromResult(new ActivityResponse()), + untilCondition: () => Volatile.Read(ref getWorkItemsCalls) >= 1, + timeout: TimeSpan.FromSeconds(2)); - Assert.Contains("boom", ex.Message); + Assert.True(Volatile.Read(ref getWorkItemsCalls) >= 1); } [Fact] @@ -532,19 +676,27 @@ public async Task StartAsync_ShouldRethrow_WhenReceiveLoopThrowsAfterFirstItemIs first: new WorkItem(), // RequestCase = None => Task.Run branch thenThrow: new InvalidOperationException("boom-after-one")); + var getWorkItemsCalls = 0; + grpcClientMock .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) + .Callback(() => Interlocked.Increment(ref getWorkItemsCalls)) .Returns(CreateServerStreamingCallFromReader(reader)); var handler = new GrpcProtocolHandler(grpcClientMock.Object, NullLoggerFactory.Instance); - var ex = await Assert.ThrowsAsync(() => - handler.StartAsync( - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => Task.FromResult(new ActivityResponse()), - cancellationToken: CancellationToken.None)); + // With "retry forever unless shutting down" semantics and a 5s reconnect delay, + // we should *not* expect a second call within a 2s test timeout. + // Instead, assert that the handler attempted to read from the stream (at least once) + // and can be canceled cleanly by the test harness. + await RunHandlerUntilAsync( + handler, + workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), + activityHandler: _ => Task.FromResult(new ActivityResponse()), + untilCondition: () => Volatile.Read(ref getWorkItemsCalls) >= 1, + timeout: TimeSpan.FromSeconds(2)); - Assert.Contains("boom-after-one", ex.Message); + Assert.True(Volatile.Read(ref getWorkItemsCalls) >= 1); } private static AsyncServerStreamingCall CreateServerStreamingCallFromReader(IAsyncStreamReader reader) diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs index f487d5937..33e9338b9 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs @@ -133,9 +133,12 @@ public async Task ExecuteAsync_ShouldComplete_WhenGrpcStreamCompletesImmediately var factory = new StubWorkflowsFactory(); + var startedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var grpcClientMock = CreateGrpcClientMock(); grpcClientMock .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) + .Callback(() => startedTcs.TrySetResult()) .Returns(CreateServerStreamingCall(EmptyWorkItems())); var worker = new WorkflowWorker( @@ -146,7 +149,15 @@ public async Task ExecuteAsync_ShouldComplete_WhenGrpcStreamCompletesImmediately services, options); - await InvokeExecuteAsync(worker, CancellationToken.None); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + + var executeTask = InvokeExecuteAsync(worker, cts.Token); + + // Wait until the worker actually tries to connect, then stop it cleanly. + await startedTcs.Task.WaitAsync(cts.Token); + cts.Cancel(); + + await executeTask; } [Fact] @@ -804,7 +815,7 @@ public async Task HandleActivityResponseAsync_ShouldReturnFailureDetails_WhenAct } [Fact] - public async Task ExecuteAsync_ShouldRethrow_WhenGrpcProtocolHandlerStartFailsWithException() + public async Task ExecuteAsync_ShouldRetry_WhenGrpcProtocolHandlerStartFailsWithException() { var services = new ServiceCollection().BuildServiceProvider(); var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); @@ -812,9 +823,12 @@ public async Task ExecuteAsync_ShouldRethrow_WhenGrpcProtocolHandlerStartFailsWi var factory = new StubWorkflowsFactory(); + var attemptedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var grpcClientMock = CreateGrpcClientMock(); grpcClientMock .Setup(x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny())) + .Callback(() => attemptedTcs.TrySetResult()) .Throws(new InvalidOperationException("boom")); var worker = new WorkflowWorker( @@ -825,8 +839,19 @@ public async Task ExecuteAsync_ShouldRethrow_WhenGrpcProtocolHandlerStartFailsWi services, options); - var ex = await Assert.ThrowsAsync(() => InvokeExecuteAsync(worker, CancellationToken.None)); - Assert.Contains("boom", ex.Message); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + + var executeTask = InvokeExecuteAsync(worker, cts.Token); + + // Wait until we observe at least one attempt, then stop the worker. + await attemptedTcs.Task.WaitAsync(cts.Token); + cts.Cancel(); + + await executeTask; + + grpcClientMock.Verify( + x => x.GetWorkItems(It.IsAny(), null, null, It.IsAny()), + Times.AtLeastOnce()); } [Fact]