diff --git a/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs b/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs index c6504d4f6..47efc4508 100644 --- a/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs +++ b/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs @@ -19,6 +19,7 @@ using Dapr.Workflow.Registration; using Dapr.Workflow.Serialization; using Dapr.Workflow.Worker; +using Grpc.Net.ClientFactory; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -53,6 +54,17 @@ public DaprWorkflowBuilder WithSerializer(IWorkflowSerializer serializer) Services.Replace(ServiceDescriptor.Singleton(typeof(IWorkflowSerializer), serializer)); return this; } + + /// + /// Configures gRPC message size limits for the workflow sidecar client. + /// + /// Maximum receive message size in bytes. Must be greater than 0 when provided. + /// Maximum send message size in bytes. Must be greater than 0 when provided. + public DaprWorkflowBuilder WithGrpcMessageSizeLimits(int? maxReceiveMessageSize = null, int? maxSendMessageSize = null) + { + ConfigureWorkflowGrpcMessageSizeLimits(Services, maxReceiveMessageSize, maxSendMessageSize); + return this; + } /// /// Configures a custom workflow serializer using a factory method. @@ -127,7 +139,7 @@ public static DaprWorkflowBuilder AddDaprWorkflowClient( { ArgumentNullException.ThrowIfNull(services); - services.AddHttpClient(); + EnsureWorkflowGrpcClientRegistered(services); var registration = new Func(provider => { @@ -177,9 +189,6 @@ private static void AddDaprWorkflowCore( return factory; }); - // Necessary for the gRPC client factory - serviceCollection.AddHttpClient(); - // Register the internal WorkflowClient implementation serviceCollection.TryAddSingleton(sp => { @@ -191,7 +200,7 @@ private static void AddDaprWorkflowCore( }); // Register gRPC client for communicating with Dapr sidecar - serviceCollection.AddDaprWorkflowGrpcClient(grpcOptions => + EnsureWorkflowGrpcClientRegistered(serviceCollection, grpcOptions => { if (options.GrpcChannelOptions != null) { @@ -230,6 +239,16 @@ private static void AddDaprWorkflowCore(IServiceCollection serviceCollection, AddDaprWorkflowCore(serviceCollection, configure, configureClient: null, lifetime); } + private static void EnsureWorkflowGrpcClientRegistered(IServiceCollection services, Action? configureClient = null) + { + ArgumentNullException.ThrowIfNull(services); + + // Required by gRPC client factory internals + services.AddHttpClient(); + + services.AddDaprWorkflowGrpcClient(configureClient); + } + private static void RegisterWorkflowClientWithBuilder( IServiceCollection serviceCollection, Action configureClient, @@ -281,4 +300,60 @@ private static void RegisterWorkflowClient(IServiceCollection serviceCollection, throw new ArgumentOutOfRangeException(nameof(lifetime), lifetime, "Invalid service lifetime"); } } + + private static void ConfigureWorkflowGrpcMessageSizeLimits( + IServiceCollection services, + int? maxReceiveMessageSize, + int? maxSendMessageSize) + { + if (!maxReceiveMessageSize.HasValue && !maxSendMessageSize.HasValue) + { + return; + } + + ValidateGrpcMessageSize(maxReceiveMessageSize, nameof(maxReceiveMessageSize)); + ValidateGrpcMessageSize(maxSendMessageSize, nameof(maxSendMessageSize)); + + var clientType = typeof(TaskHubSidecarService.TaskHubSidecarServiceClient); + + services.PostConfigure(clientType.FullName!, options => + { + options.ChannelOptionsActions.Add(channelOptions => + { + if (maxReceiveMessageSize.HasValue) + { + channelOptions.MaxReceiveMessageSize = maxReceiveMessageSize.Value; + } + + if (maxSendMessageSize.HasValue) + { + channelOptions.MaxSendMessageSize = maxSendMessageSize.Value; + } + }); + }); + + services.PostConfigure(clientType.Name, options => + { + options.ChannelOptionsActions.Add(channelOptions => + { + if (maxReceiveMessageSize.HasValue) + { + channelOptions.MaxReceiveMessageSize = maxReceiveMessageSize.Value; + } + + if (maxSendMessageSize.HasValue) + { + channelOptions.MaxSendMessageSize = maxSendMessageSize.Value; + } + }); + }); + } + + private static void ValidateGrpcMessageSize(int? value, string paramName) + { + if (value.HasValue) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(value.Value, 0, paramName); + } + } } diff --git a/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs b/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs index 2284d30a3..ff7c4bfdb 100644 --- a/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Workflow.Test/WorkflowServiceCollectionExtensionsTests.cs @@ -245,6 +245,58 @@ public void AddDaprWorkflow_ShouldRegisterWorkflowWorker_AsHostedService() Assert.Contains(hostedDescriptors, d => d.ImplementationType == typeof(WorkflowWorker)); } + [Fact] + public void AddDaprWorkflowClient_WithGrpcMessageSizeLimits_ShouldApplyIntoGrpcClientFactoryOptions() + { + var services = new ServiceCollection(); + services.AddLogging(); + + services + .AddDaprWorkflowClient() + .WithGrpcMessageSizeLimits(maxReceiveMessageSize: 4321, maxSendMessageSize: 8765); + + var sp = services.BuildServiceProvider(); + + var monitor = sp.GetRequiredService>(); + var clientType = typeof(TaskHubSidecarService.TaskHubSidecarServiceClient); + + var grpcOptions = + monitor.Get(clientType.FullName!) + ?? monitor.Get(clientType.Name); + + if (grpcOptions.ChannelOptionsActions.Count == 0) + { + grpcOptions = monitor.Get(clientType.Name); + } + + Assert.NotNull(grpcOptions); + Assert.NotEmpty(grpcOptions.ChannelOptionsActions); + + var channelOptions = new GrpcChannelOptions(); + foreach (var action in grpcOptions.ChannelOptionsActions) + { + action(channelOptions); + } + + Assert.Equal(4321, channelOptions.MaxReceiveMessageSize); + Assert.Equal(8765, channelOptions.MaxSendMessageSize); + } + + [Theory] + [InlineData(0, 1024)] + [InlineData(1024, 0)] + [InlineData(-1, 1024)] + [InlineData(1024, -1)] + public void WithGrpcMessageSizeLimits_ShouldThrowArgumentOutOfRangeException_ForNonPositiveValues(int receive, int send) + { + var services = new ServiceCollection(); + services.AddDaprWorkflowClient(); + + var builder = new WorkflowServiceCollectionExtensions.DaprWorkflowBuilder(services); + + Assert.Throws(() => builder.WithGrpcMessageSizeLimits(receive, send)); + } + [Theory] [InlineData(ServiceLifetime.Singleton)] [InlineData(ServiceLifetime.Scoped)] @@ -261,6 +313,20 @@ public void AddDaprWorkflow_ShouldRegisterDaprWorkflowClient_WithConfiguredLifet Assert.Equal(lifetime, descriptor!.Lifetime); } + [Fact] + public void AddDaprWorkflowClient_ShouldResolve_GrpcTypedClient() + { + var services = new ServiceCollection(); + services.AddLogging(); + + services.AddDaprWorkflowClient(); + + var sp = services.BuildServiceProvider(); + var grpcClient = sp.GetService(); + + Assert.NotNull(grpcClient); + } + private sealed record SerializerDependency(string Value); private sealed class DependencyBasedSerializer(SerializerDependency dep) : IWorkflowSerializer