Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ internal sealed class GrpcProtocolHandler(
{
private static readonly TimeSpan ReconnectDelay = TimeSpan.FromSeconds(5);
private static readonly TimeSpan KeepaliveInterval = TimeSpan.FromSeconds(30);

private readonly CancellationTokenSource _disposalCts = new();
private readonly ILogger<GrpcProtocolHandler> _logger = loggerFactory?.CreateLogger<GrpcProtocolHandler>() ?? throw new ArgumentNullException(nameof(loggerFactory));
private readonly TaskHubSidecarService.TaskHubSidecarServiceClient _grpcClient =
grpcClient ?? throw new ArgumentNullException(nameof(grpcClient));
private readonly int _maxConcurrentWorkItems = maxConcurrentWorkItems > 0 ? maxConcurrentWorkItems : throw new ArgumentOutOfRangeException(nameof(maxConcurrentWorkItems));
private readonly int _maxConcurrentActivities = maxConcurrentActivities > 0 ? maxConcurrentActivities : throw new ArgumentOutOfRangeException(nameof(maxConcurrentActivities));
private readonly SemaphoreSlim _orchestrationSemaphore = new(maxConcurrentWorkItems, maxConcurrentWorkItems);
private readonly SemaphoreSlim _activitySemaphore = new(maxConcurrentActivities, maxConcurrentActivities);

private AsyncServerStreamingCall<WorkItem>? _streamingCall;
private int _activeWorkItemCount;
Expand Down Expand Up @@ -221,6 +223,7 @@ private async Task ReceiveLoopAsync(
private async Task ProcessWorkflowAsync(OrchestratorRequest request, string completionToken,
Func<OrchestratorRequest, string, Task<OrchestratorResponse>> handler, CancellationToken cancellationToken)
{
await _orchestrationSemaphore.WaitAsync(cancellationToken);
var activeCount = Interlocked.Increment(ref _activeWorkItemCount);

try
Expand Down Expand Up @@ -252,6 +255,7 @@ private async Task ProcessWorkflowAsync(OrchestratorRequest request, string comp
}
finally
{
_orchestrationSemaphore.Release();
Interlocked.Decrement(ref _activeWorkItemCount);
}
}
Expand All @@ -262,6 +266,7 @@ private async Task ProcessWorkflowAsync(OrchestratorRequest request, string comp
private async Task ProcessActivityAsync(ActivityRequest request, string completionToken,
Func<ActivityRequest, string, Task<ActivityResponse>> handler, CancellationToken cancellationToken)
{
await _activitySemaphore.WaitAsync(cancellationToken);
var activeCount = Interlocked.Increment(ref _activeWorkItemCount);

try
Expand Down Expand Up @@ -296,6 +301,7 @@ private async Task ProcessActivityAsync(ActivityRequest request, string completi
}
finally
{
_activitySemaphore.Release();
Interlocked.Decrement(ref _activeWorkItemCount);
}
}
Expand Down Expand Up @@ -368,13 +374,15 @@ public async ValueTask DisposeAsync()
{
if (_disposalCts.IsCancellationRequested)
return;

_logger.LogGrpcProtocolHandlerDisposing();

await _disposalCts.CancelAsync();
_streamingCall?.Dispose();
_disposalCts.Dispose();

_orchestrationSemaphore.Dispose();
_activitySemaphore.Dispose();

_logger.LogGrpcProtocolHandlerDisposed();
}

Expand Down
203 changes: 203 additions & 0 deletions test/Dapr.IntegrationTest.Workflow/MaxConcurrentActivitiesTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Testcontainers.Common;
using Dapr.Testcontainers.Harnesses;
using Dapr.Workflow;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Dapr.IntegrationTest.Workflow;

public sealed class MaxConcurrentActivitiesTests
{
/// <summary>
/// Verifies that <see cref="WorkflowRuntimeOptions.MaxConcurrentActivities"/> = 1 limits
/// activity execution to a single concurrent activity even when the workflow fans out more.
/// </summary>
[Fact]
public async Task ShouldRespectMaxConcurrentActivitiesLimitOfOne()
{
const int limit = 1;
const int activityCount = 5;

var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(
needsActorState: true,
cancellationToken: TestContext.Current.CancellationToken);
await environment.StartAsync(TestContext.Current.CancellationToken);

var harness = new DaprHarnessBuilder(componentsDir)
.WithEnvironment(environment)
.BuildWorkflow();

await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
{
opt.MaxConcurrentActivities = limit;
opt.RegisterWorkflow<FanOutWorkflow>();
opt.RegisterActivity<ConcurrencyTrackingActivity>();
},
configureClient: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

ConcurrencyTrackingActivity.Reset();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(FanOutWorkflow), workflowInstanceId, activityCount);
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(
workflowInstanceId, true, TestContext.Current.CancellationToken);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
Assert.True(
ConcurrencyTrackingActivity.MaxObservedConcurrency <= limit,
$"Expected max concurrent activities <= {limit}, but observed {ConcurrencyTrackingActivity.MaxObservedConcurrency}");
}

/// <summary>
/// Verifies that <see cref="WorkflowRuntimeOptions.MaxConcurrentActivities"/> = 3 allows up to
/// 3 concurrent activities and that all activities complete successfully.
/// </summary>
[Fact]
public async Task ShouldRespectMaxConcurrentActivitiesLimitOfThree()
{
const int limit = 3;
const int activityCount = 10;

var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(
needsActorState: true,
cancellationToken: TestContext.Current.CancellationToken);
await environment.StartAsync(TestContext.Current.CancellationToken);

var harness = new DaprHarnessBuilder(componentsDir)
.WithEnvironment(environment)
.BuildWorkflow();

await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
{
opt.MaxConcurrentActivities = limit;
opt.RegisterWorkflow<FanOutWorkflow>();
opt.RegisterActivity<ConcurrencyTrackingActivity>();
},
configureClient: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

ConcurrencyTrackingActivity.Reset();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(FanOutWorkflow), workflowInstanceId, activityCount);
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(
workflowInstanceId, true, TestContext.Current.CancellationToken);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
Assert.True(
ConcurrencyTrackingActivity.MaxObservedConcurrency <= limit,
$"Expected max concurrent activities <= {limit}, but observed {ConcurrencyTrackingActivity.MaxObservedConcurrency}");
}

/// <summary>
/// Tracks the maximum number of concurrently executing activity instances using a shared
/// static counter. Each activity holds for a brief period so concurrent executions can
/// accumulate and be observed.
/// </summary>
private sealed class ConcurrencyTrackingActivity : WorkflowActivity<int, int>
{
private static int _currentConcurrent;
private static int _maxObservedConcurrent;
private static readonly object Lock = new();

public static int MaxObservedConcurrency
{
get
{
lock (Lock)
{
return _maxObservedConcurrent;
}
}
}

public static void Reset()
{
lock (Lock)
{
_currentConcurrent = 0;
_maxObservedConcurrent = 0;
}
}

public override async Task<int> RunAsync(WorkflowActivityContext context, int input)
{
lock (Lock)
{
_currentConcurrent++;
if (_currentConcurrent > _maxObservedConcurrent)
_maxObservedConcurrent = _currentConcurrent;
}

// Hold briefly so any concurrent activities can be observed accumulating.
await Task.Delay(TimeSpan.FromMilliseconds(300));

lock (Lock)
{
_currentConcurrent--;
}

return input;
}
}

private sealed class FanOutWorkflow : Workflow<int, int[]>
{
public override async Task<int[]> RunAsync(WorkflowContext context, int input)
{
var tasks = new Task<int>[input];
for (var i = 0; i < input; i++)
{
tasks[i] = context.CallActivityAsync<int>(nameof(ConcurrencyTrackingActivity), i);
}

return await Task.WhenAll(tasks);
}
}
}
Loading
Loading